diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java b/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java index b4c85a62fcb..4b3d63d2e53 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java @@ -96,24 +96,25 @@ private static boolean looksLikeJson(byte[] body) { } int i = 0; // Skip optional UTF-8 BOM (0xEF, 0xBB, 0xBF) - if (body.length >= 3 - && (body[0] & 0xFF) == 0xEF - && (body[1] & 0xFF) == 0xBB - && (body[2] & 0xFF) == 0xBF) { + if (body.length >= 3 && (body[0] & 0xFF) == 0xEF && (body[1] & 0xFF) == 0xBB && (body[2] & 0xFF) == 0xBF) { i = 3; } // skip leading whitespace (including Unicode NBSP / BOM that survived as whitespace) - while (i < body.length && (body[i] == ' ' || body[i] == '\t' - || body[i] == '\r' || body[i] == '\n')) { + while (i < body.length && (body[i] == ' ' || body[i] == '\t' || body[i] == '\r' || body[i] == '\n')) { i++; } if (i >= body.length) { return true; } byte first = body[i]; - return first == '{' || first == '[' || first == '"' - || first == 't' || first == 'f' || first == 'n' - || (first >= '0' && first <= '9') || first == '-'; + return first == '{' + || first == '[' + || first == '"' + || first == 't' + || first == 'f' + || first == 'n' + || (first >= '0' && first <= '9') + || first == '-'; } @Override @@ -158,19 +159,18 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo if (node.getRole() == ClusterRole.LEADER) { headers.add(RAFT_GROUP_HEADER, node.getUnit()); } - Collections.list(request.getHeaderNames()) - .forEach(headerName -> { - if (!HttpHeaders.HOST.equalsIgnoreCase(headerName) - && !HttpHeaders.CONNECTION.equalsIgnoreCase(headerName) - && !"Keep-Alive".equalsIgnoreCase(headerName) - && !HttpHeaders.PROXY_AUTHENTICATE.equalsIgnoreCase(headerName) - && !HttpHeaders.PROXY_AUTHORIZATION.equalsIgnoreCase(headerName) - && !HttpHeaders.TE.equalsIgnoreCase(headerName) - && !HttpHeaders.TRAILER.equalsIgnoreCase(headerName) - && !HttpHeaders.UPGRADE.equalsIgnoreCase(headerName)) { - headers.add(headerName, request.getHeader(headerName)); - } - }); + Collections.list(request.getHeaderNames()).forEach(headerName -> { + if (!HttpHeaders.HOST.equalsIgnoreCase(headerName) + && !HttpHeaders.CONNECTION.equalsIgnoreCase(headerName) + && !"Keep-Alive".equalsIgnoreCase(headerName) + && !HttpHeaders.PROXY_AUTHENTICATE.equalsIgnoreCase(headerName) + && !HttpHeaders.PROXY_AUTHORIZATION.equalsIgnoreCase(headerName) + && !HttpHeaders.TE.equalsIgnoreCase(headerName) + && !HttpHeaders.TRAILER.equalsIgnoreCase(headerName) + && !HttpHeaders.UPGRADE.equalsIgnoreCase(headerName)) { + headers.add(headerName, request.getHeader(headerName)); + } + }); // Create the HttpEntity with headers and body HttpMethod httpMethod; @@ -197,7 +197,8 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo // headers-only for empty body httpEntity = new HttpEntity<>(headers); } else { - // Remove potentially stale length/transfer headers and let the client recompute them + // Remove potentially stale length/transfer headers and let the client recompute + // them headers.remove(HttpHeaders.CONTENT_LENGTH); headers.remove(HttpHeaders.TRANSFER_ENCODING); httpEntity = new HttpEntity<>(body, headers); @@ -205,8 +206,10 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo } try { - ResponseEntity responseEntity = restTemplate.exchange(URI.create(targetUrl), httpMethod, httpEntity, byte[].class); - //Copy headers from proxied response, skipping hop-by-hop and headers we manage ourselves to mitigate + ResponseEntity responseEntity = restTemplate.exchange( + URI.create(targetUrl), httpMethod, httpEntity, byte[].class); + // Copy headers from proxied response, skipping hop-by-hop and headers we manage + // ourselves to mitigate // security risks from Content-Type manipulation responseEntity.getHeaders().forEach((key, value) -> { if (!HttpHeaders.CONTENT_TYPE.equalsIgnoreCase(key) @@ -225,7 +228,8 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo }); // Force a safe Content-Type: reject HTML/XML types that could // execute scripts; fall back to application/json - String proxiedContentType = responseEntity.getHeaders().getFirst(HttpHeaders.CONTENT_TYPE); + String proxiedContentType = + responseEntity.getHeaders().getFirst(HttpHeaders.CONTENT_TYPE); String safeContentType; if (isSafeContentType(proxiedContentType)) { safeContentType = proxiedContentType; @@ -234,16 +238,20 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo } response.setContentType(safeContentType); response.setHeader("X-Content-Type-Options", "nosniff"); - response.setStatus(responseEntity.getStatusCode().value()); + response.setStatus( + responseEntity.getStatusCode().value()); byte[] responseBody = responseEntity.getBody(); // HEAD responses must not include a message body (RFC 7231 §4.3.2) if (!HttpMethod.HEAD.equals(httpMethod) - && responseBody != null && responseBody.length > 0) { + && responseBody != null + && responseBody.length > 0) { // For JSON content type, validate that the body actually looks // like JSON to prevent XSS via crafted upstream responses if (safeContentType.toLowerCase(Locale.ROOT).contains("application/json") && !looksLikeJson(responseBody)) { - LOGGER.warn("Upstream returned non-JSON body for Content-Type {}, replacing with error response", safeContentType); + LOGGER.warn( + "Upstream returned non-JSON body for Content-Type {}, replacing with error response", + safeContentType); response.setStatus(HttpServletResponse.SC_BAD_GATEWAY); response.setContentType("application/json;charset=UTF-8"); responseBody = "{\"error\":\"Upstream returned invalid response body\"}" @@ -256,7 +264,10 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo // Client likely disconnected (broken pipe); log at debug // level and do NOT attempt sendError – the response may // already be committed. - LOGGER.debug("Failed to write proxy response body (client disconnect?): {}", e.getMessage(), e); + LOGGER.debug( + "Failed to write proxy response body (client disconnect?): {}", + e.getMessage(), + e); } } } catch (Exception ex) { diff --git a/namingserver/src/test/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilterTest.java b/namingserver/src/test/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilterTest.java index 0efa2da41cd..7ed65ba1571 100644 --- a/namingserver/src/test/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilterTest.java +++ b/namingserver/src/test/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilterTest.java @@ -75,8 +75,7 @@ void setUp() { NamingServerNode node = new NamingServerNode(); node.setControl(new Node.Endpoint(TARGET_HOST, TARGET_PORT, "http")); - when(namingManager.getInstances(NAMESPACE, CLUSTER)) - .thenReturn(Collections.singletonList(node)); + when(namingManager.getInstances(NAMESPACE, CLUSTER)).thenReturn(Collections.singletonList(node)); } /** @@ -97,9 +96,7 @@ void getRequestWithBodyShouldStripBody() throws Exception { HttpHeaders responseHeaders = new HttpHeaders(); responseHeaders.set(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8"); ResponseEntity upstreamResponse = new ResponseEntity<>( - "{\"result\":\"ok\"}".getBytes(StandardCharsets.UTF_8), - responseHeaders, - HttpStatus.OK); + "{\"result\":\"ok\"}".getBytes(StandardCharsets.UTF_8), responseHeaders, HttpStatus.OK); when(restTemplate.exchange(any(URI.class), eq(HttpMethod.GET), any(HttpEntity.class), eq(byte[].class))) .thenReturn(upstreamResponse); @@ -115,9 +112,11 @@ void getRequestWithBodyShouldStripBody() throws Exception { // Body must be null (stripped for GET) assertNull(capturedEntity.getBody(), "GET request body should be stripped (null)"); // Content-Length and Transfer-Encoding headers must not be forwarded - assertNull(capturedEntity.getHeaders().get(HttpHeaders.CONTENT_LENGTH), + assertNull( + capturedEntity.getHeaders().get(HttpHeaders.CONTENT_LENGTH), "Content-Length header should be removed for GET"); - assertNull(capturedEntity.getHeaders().get(HttpHeaders.TRANSFER_ENCODING), + assertNull( + capturedEntity.getHeaders().get(HttpHeaders.TRANSFER_ENCODING), "Transfer-Encoding header should be removed for GET"); // Verify filterChain was NOT invoked (proxied) @@ -137,8 +136,7 @@ void headRequestShouldStripBody() throws Exception { HttpHeaders responseHeaders = new HttpHeaders(); responseHeaders.set(HttpHeaders.CONTENT_TYPE, "application/json"); - ResponseEntity upstreamResponse = new ResponseEntity<>( - null, responseHeaders, HttpStatus.OK); + ResponseEntity upstreamResponse = new ResponseEntity<>(null, responseHeaders, HttpStatus.OK); when(restTemplate.exchange(any(URI.class), eq(HttpMethod.HEAD), any(HttpEntity.class), eq(byte[].class))) .thenReturn(upstreamResponse); @@ -167,9 +165,7 @@ void postRequestShouldForwardBody() throws Exception { HttpHeaders responseHeaders = new HttpHeaders(); responseHeaders.set(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8"); ResponseEntity upstreamResponse = new ResponseEntity<>( - "{\"result\":\"created\"}".getBytes(StandardCharsets.UTF_8), - responseHeaders, - HttpStatus.OK); + "{\"result\":\"created\"}".getBytes(StandardCharsets.UTF_8), responseHeaders, HttpStatus.OK); when(restTemplate.exchange(any(URI.class), eq(HttpMethod.POST), any(HttpEntity.class), eq(byte[].class))) .thenReturn(upstreamResponse); @@ -182,7 +178,8 @@ void postRequestShouldForwardBody() throws Exception { byte[] capturedBody = entityCaptor.getValue().getBody(); assertNotNull(capturedBody, "POST body should not be null"); - assertEquals(new String(bodyBytes, StandardCharsets.UTF_8), + assertEquals( + new String(bodyBytes, StandardCharsets.UTF_8), new String(capturedBody, StandardCharsets.UTF_8), "POST request body should be forwarded as-is"); } @@ -215,16 +212,14 @@ void nonJsonBodyWithJsonContentTypeShouldReturn502() throws Exception { responseHeaders.set(HttpHeaders.CONTENT_TYPE, "application/json"); // Upstream sends HTML disguised as JSON byte[] htmlBody = "".getBytes(StandardCharsets.UTF_8); - ResponseEntity upstreamResponse = new ResponseEntity<>( - htmlBody, responseHeaders, HttpStatus.OK); + ResponseEntity upstreamResponse = new ResponseEntity<>(htmlBody, responseHeaders, HttpStatus.OK); when(restTemplate.exchange(any(URI.class), eq(HttpMethod.GET), any(HttpEntity.class), eq(byte[].class))) .thenReturn(upstreamResponse); filter.doFilter(request, response, filterChain); - assertEquals(502, response.getStatus(), - "Should return 502 when upstream body is not valid JSON"); + assertEquals(502, response.getStatus(), "Should return 502 when upstream body is not valid JSON"); String body = response.getContentAsString(); assertEquals("{\"error\":\"Upstream returned invalid response body\"}", body); } @@ -240,4 +235,3 @@ private MockHttpServletRequest createConsoleRequest(String method) { return request; } } - diff --git a/rm-datasource/pom.xml b/rm-datasource/pom.xml index c08172c2943..0386cb4c0ec 100644 --- a/rm-datasource/pom.xml +++ b/rm-datasource/pom.xml @@ -164,5 +164,15 @@ json-common-core ${project.version} + + ${project.groupId} + seata-metrics-core + ${project.version} + + + ${project.groupId} + seata-metrics-registry-compact + ${project.version} + diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/AbstractUndoLogManager.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/AbstractUndoLogManager.java index 2d9e525fd3f..39f7931f76d 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/AbstractUndoLogManager.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/AbstractUndoLogManager.java @@ -28,6 +28,8 @@ import org.apache.seata.core.exception.BranchTransactionException; import org.apache.seata.core.exception.TransactionException; import org.apache.seata.core.rpc.processor.Pair; +import org.apache.seata.metrics.registry.Registry; +import org.apache.seata.metrics.registry.RegistryFactory; import org.apache.seata.rm.datasource.ConnectionContext; import org.apache.seata.rm.datasource.ConnectionProxy; import org.apache.seata.rm.datasource.DataSourceProxy; @@ -47,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import static org.apache.seata.common.DefaultValues.DEFAULT_CLIENT_UNDO_COMPRESS_ENABLE; import static org.apache.seata.common.DefaultValues.DEFAULT_CLIENT_UNDO_COMPRESS_THRESHOLD; @@ -59,6 +62,21 @@ public abstract class AbstractUndoLogManager implements UndoLogManager { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractUndoLogManager.class); + /** + * Cached metrics registry instance to avoid repeated RegistryFactory lookups. + */ + private static volatile Registry CACHED_REGISTRY; + + /** + * Flag indicating that the metrics registry is unavailable and should not be retried. + */ + private static volatile boolean REGISTRY_UNAVAILABLE; + + /** + * Lock object for lazy initialization of the metrics registry. + */ + private static final Object REGISTRY_INIT_LOCK = new Object(); + protected enum State { /** * This state can be properly rolled back by services @@ -130,6 +148,7 @@ public static void removeCurrentSerializer() { */ @Override public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException { + long start = System.nanoTime(); try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_SQL); PreparedStatement deleteSubPST = conn.prepareStatement(DELETE_SUB_UNDO_LOG_SQL)) { deletePST.setLong(1, branchId); @@ -139,6 +158,14 @@ public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQL deleteSubPST.setString(1, UndoLogConstants.BRANCH_ID_KEY + CollectionUtils.KV_SPLIT + branchId); deleteSubPST.setString(2, xid); deleteSubPST.executeUpdate(); + + Registry registry = getRegistry(); + if (registry != null) { + registry.getTimer(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY) + .record(System.nanoTime() - start, TimeUnit.NANOSECONDS); + registry.getCounter(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT) + .increase(1); + } } catch (Exception e) { if (!(e instanceof SQLException)) { e = new SQLException(e); @@ -159,6 +186,7 @@ public void batchDeleteUndoLog(Set xids, Set branchIds, Connection if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) { return; } + long start = System.nanoTime(); int xidSize = xids.size(); int branchIdSize = branchIds.size(); String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize); @@ -185,6 +213,16 @@ public void batchDeleteUndoLog(Set xids, Set branchIds, Connection if (LOGGER.isDebugEnabled()) { LOGGER.debug("batch delete sub undo log size {}", deleteSubRows); } + + Registry registry = getRegistry(); + if (registry != null) { + registry.getTimer(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY) + .record(System.nanoTime() - start, TimeUnit.NANOSECONDS); + if (deleteRows > 0) { + registry.getCounter(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT) + .increase(1); + } + } } catch (Exception e) { if (!(e instanceof SQLException)) { e = new SQLException(e); @@ -300,6 +338,10 @@ public void flushUndoLogs(ConnectionProxy cp) throws SQLException { String rollbackCtx = buildContext(parser.getName(), compressorType, UndoLogConstants.MAX_ALLOWED_PACKET, maxAllowedPacket); insertUndoLogWithNormal(xid, branchId, rollbackCtx, undoLogContent, cp.getTargetConnection()); + Registry registry = getRegistry(); + if (registry != null) { + registry.getSummary(UndoLogConstants.SUMMARY_UNDO_LOG_SIZE).increase(undoLogContent.length); + } } /** @@ -586,4 +628,40 @@ public boolean hasUndoLogTable(Connection conn) { protected String getCheckUndoLogTableExistSql() { return CHECK_UNDO_LOG_TABLE_EXIST_SQL; } + + /** + * Get the metrics registry instance. + * This method is protected to allow testing with Mockito spy. + * + * @return the Registry instance, or null if metrics are not enabled + */ + protected Registry getRegistry() { + // Fast path: return cached values without synchronization. + if (REGISTRY_UNAVAILABLE) { + return null; + } + Registry registry = CACHED_REGISTRY; + if (registry != null) { + return registry; + } + // Lazy initialization with synchronization. + synchronized (REGISTRY_INIT_LOCK) { + if (REGISTRY_UNAVAILABLE) { + return null; + } + if (CACHED_REGISTRY != null) { + return CACHED_REGISTRY; + } + try { + registry = RegistryFactory.getInstance(); + CACHED_REGISTRY = registry; + return registry; + } catch (Throwable t) { + // Mark as unavailable to avoid repeated initialization attempts and log noise. + REGISTRY_UNAVAILABLE = true; + LOGGER.warn("Failed to initialize metrics registry.", t); + return null; + } + } + } } diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/UndoLogConstants.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/UndoLogConstants.java index 9585a28ff0b..2158fab620e 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/UndoLogConstants.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/UndoLogConstants.java @@ -18,6 +18,8 @@ import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.constants.ConfigurationKeys; +import org.apache.seata.metrics.Id; +import org.apache.seata.metrics.IdConstants; import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION; @@ -38,4 +40,19 @@ public interface UndoLogConstants { String SUB_SPLIT_KEY = ","; String MAX_ALLOWED_PACKET = "map"; + + Id SUMMARY_UNDO_LOG_SIZE = new Id("seata.undo.log") + .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM) + .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY) + .withTag("type", "size"); + + Id TIMER_UNDO_LOG_DELETE_LATENCY = new Id("seata.undo.log") + .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM) + .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER) + .withTag("type", "delete_latency"); + + Id COUNTER_UNDO_LOG_DELETE_COUNT = new Id("seata.undo.log") + .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM) + .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_COUNTER) + .withTag("type", "delete_count"); } diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/UndoLogMetricsTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/UndoLogMetricsTest.java new file mode 100644 index 00000000000..ebc1b7c68aa --- /dev/null +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/UndoLogMetricsTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.rm.datasource.undo; + +import org.apache.seata.metrics.Counter; +import org.apache.seata.metrics.Id; +import org.apache.seata.metrics.Summary; +import org.apache.seata.metrics.Timer; +import org.apache.seata.metrics.registry.Registry; +import org.apache.seata.rm.datasource.ConnectionContext; +import org.apache.seata.rm.datasource.ConnectionProxy; +import org.apache.seata.rm.datasource.DataSourceProxy; +import org.apache.seata.rm.datasource.undo.mysql.MySQLUndoLogManager; +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class UndoLogMetricsTest { + + @Test + public void testFlushUndoLogsMetrics() throws Exception { + Registry registry = mock(Registry.class); + Summary summary = mock(Summary.class); + when(registry.getSummary(any(Id.class))).thenReturn(summary); + + ConnectionProxy connectionProxy = mock(ConnectionProxy.class); + ConnectionContext context = mock(ConnectionContext.class); + DataSourceProxy dataSourceProxy = mock(DataSourceProxy.class); + + when(connectionProxy.getContext()).thenReturn(context); + when(connectionProxy.getDataSourceProxy()).thenReturn(dataSourceProxy); + Connection connection = mock(Connection.class); + PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(connection.prepareStatement(any(String.class))).thenReturn(preparedStatement); + when(connectionProxy.getTargetConnection()).thenReturn(connection); + when(dataSourceProxy.getResourceId()).thenReturn("jdbc:mysql://localhost:3306/test"); + + when(context.hasUndoLog()).thenReturn(true); + when(context.getXid()).thenReturn("xid"); + when(context.getBranchId()).thenReturn(123456L); + when(context.getUndoItems()).thenReturn(Collections.emptyList()); + + // Spy on the manager to mock getRegistry() + MySQLUndoLogManager manager = spy(new MySQLUndoLogManager()); + doReturn(registry).when(manager).getRegistry(); + + manager.flushUndoLogs(connectionProxy); + + verify(registry).getSummary(eq(UndoLogConstants.SUMMARY_UNDO_LOG_SIZE)); + verify(summary).increase(anyLong()); + } + + @Test + public void testDeleteUndoLogMetrics() throws Exception { + Registry registry = mock(Registry.class); + Timer timer = mock(Timer.class); + Counter counter = mock(Counter.class); + + when(registry.getTimer(any(Id.class))).thenReturn(timer); + when(registry.getCounter(any(Id.class))).thenReturn(counter); + + Connection connection = mock(Connection.class); + PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(connection.prepareStatement(any(String.class))).thenReturn(preparedStatement); + + // Spy on the manager to mock getRegistry() + MySQLUndoLogManager manager = spy(new MySQLUndoLogManager()); + doReturn(registry).when(manager).getRegistry(); + + manager.deleteUndoLog("xid", 123L, connection); + + verify(registry).getTimer(eq(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY)); + verify(timer).record(anyLong(), any(TimeUnit.class)); + verify(registry).getCounter(eq(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT)); + verify(counter).increase(1); + } + + @Test + public void testBatchDeleteUndoLogMetrics() throws Exception { + Registry registry = mock(Registry.class); + Timer timer = mock(Timer.class); + Counter counter = mock(Counter.class); + + when(registry.getTimer(any(Id.class))).thenReturn(timer); + when(registry.getCounter(any(Id.class))).thenReturn(counter); + + Connection connection = mock(Connection.class); + PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(connection.prepareStatement(any(String.class))).thenReturn(preparedStatement); + when(preparedStatement.executeUpdate()).thenReturn(1); // Simulate rows affected + + // Spy on the manager to mock getRegistry() + MySQLUndoLogManager manager = spy(new MySQLUndoLogManager()); + doReturn(registry).when(manager).getRegistry(); + + manager.batchDeleteUndoLog( + new HashSet<>(Arrays.asList("xid1", "xid2")), new HashSet<>(Arrays.asList(1L, 2L)), connection); + + verify(registry).getTimer(eq(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY)); + verify(timer).record(anyLong(), any(TimeUnit.class)); + verify(registry).getCounter(eq(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT)); + verify(counter).increase(1); // Aligned to 1 operation count + } + + @Test + public void testBatchDeleteUndoLogMetricsWithNoRows() throws Exception { + Registry registry = mock(Registry.class); + Timer timer = mock(Timer.class); + Counter counter = mock(Counter.class); + + when(registry.getTimer(any(Id.class))).thenReturn(timer); + when(registry.getCounter(any(Id.class))).thenReturn(counter); + + Connection connection = mock(Connection.class); + PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(connection.prepareStatement(any(String.class))).thenReturn(preparedStatement); + when(preparedStatement.executeUpdate()).thenReturn(0); // No rows affected + + // Spy on the manager to mock getRegistry() + MySQLUndoLogManager manager = spy(new MySQLUndoLogManager()); + doReturn(registry).when(manager).getRegistry(); + + manager.batchDeleteUndoLog( + new HashSet<>(Collections.singletonList("xid")), + new HashSet<>(Collections.singletonList(1L)), + connection); + + verify(registry).getTimer(eq(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY)); + verify(timer).record(anyLong(), any(TimeUnit.class)); + // Counter should NOT be increased if rows deleted is 0 + verify(registry, never()).getCounter(any(Id.class)); + } +}