From 6635e80ce5365ba0db2a3566272a62f0a6c221ea Mon Sep 17 00:00:00 2001 From: sywu14 Date: Tue, 26 May 2026 15:39:11 +0800 Subject: [PATCH] [ISSUE #7993] Implement NoWait mechanism for database distributed lock Under high concurrency (500+ TPS), blocking SELECT FOR UPDATE causes threads to wait 100-500ms when lock contention occurs. This change introduces NOWAIT support for MySQL 8.0+, PostgreSQL, and Oracle, reducing lock contention latency to <1ms by failing fast instead of blocking. Changes: - Add database dialect detection in DataBaseDistributedLocker - Generate NOWAIT SQL for supported databases (MySQL/PostgreSQL/Oracle) - Handle lock-busy exceptions gracefully (return false instead of throw) - Maintain backward compatibility for databases without NOWAIT support - Add configuration switch store.db.distributedLockNoWaitEnabled (default false) - Extend ignoreSQLException to recognize MySQL 3572, PostgreSQL 55P03, Oracle 54 - Add comprehensive unit tests for NOWAIT SQL generation and exception handling Resolves TODO comment at line 150 in DataBaseDistributedLocker. Signed-off-by: sywu14 --- .../seata/common/ConfigurationKeys.java | 12 ++ .../apache/seata/common/DefaultValues.java | 10 ++ .../distributed/lock/DistributedLockSql.java | 14 ++ .../lock/MysqlDistributedLockSql.java | 44 ++++++ .../lock/OracleDistributedLockSql.java | 42 ++++++ .../lock/PostgresqlDistributedLockSql.java | 42 ++++++ ...db.sql.distributed.lock.DistributedLockSql | 3 + .../lock/DistributedLockSqlFactoryTest.java | 16 +- .../lock/DistributedLockSqlNoWaitTest.java | 81 +++++++++++ .../db/lock/DataBaseDistributedLocker.java | 76 +++++++++- .../lock/DataBaseDistributedLockerTest.java | 137 ++++++++++++++++++ 11 files changed, 471 insertions(+), 6 deletions(-) create mode 100644 core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/MysqlDistributedLockSql.java create mode 100644 core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/OracleDistributedLockSql.java create mode 100644 core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/PostgresqlDistributedLockSql.java create mode 100644 core/src/test/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSqlNoWaitTest.java diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index e31bc8e72e2..e2c4bcb0768 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -320,6 +320,18 @@ public interface ConfigurationKeys { */ String DISTRIBUTED_LOCK_DB_TABLE = STORE_DB_PREFIX + "distributedLockTable"; + /** + * The constant DISTRIBUTED_LOCK_DB_NOWAIT_ENABLED. + * + *

When enabled, the {@code DataBaseDistributedLocker} acquires the row + * lock with {@code FOR UPDATE NOWAIT} (or the dialect equivalent) to fail + * fast on contention instead of waiting for the database lock-wait + * timeout. Defaults to {@code false} so existing deployments keep their + * legacy blocking behaviour; flip to {@code true} on databases that + * support NOWAIT (MySQL 8.0+, PostgreSQL, Oracle). + */ + String DISTRIBUTED_LOCK_DB_NOWAIT_ENABLED = STORE_DB_PREFIX + "distributedLockNoWaitEnabled"; + /** * The constant STORE_DB_DATASOURCE_TYPE. */ diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java index 9c1c0cd4c90..af31c5ede71 100644 --- a/common/src/main/java/org/apache/seata/common/DefaultValues.java +++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java @@ -278,6 +278,16 @@ public interface DefaultValues { */ String DEFAULT_DISTRIBUTED_LOCK_DB_TABLE = "distributed_lock"; + /** + * The default value of {@link ConfigurationKeys#DISTRIBUTED_LOCK_DB_NOWAIT_ENABLED}. + * + *

Disabled by default to preserve the legacy blocking behaviour for + * deployments running on databases without NOWAIT support (MySQL 5.x, + * MariaDB, etc.). Operators on MySQL 8.0+, PostgreSQL or Oracle can opt + * in to fast-fail acquisition. + */ + boolean DEFAULT_DISTRIBUTED_LOCK_DB_NOWAIT_ENABLED = false; + /** * The constant DEFAULT_TM_COMMIT_RETRY_COUNT. */ diff --git a/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSql.java b/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSql.java index b8afdffc356..22834664d14 100644 --- a/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSql.java +++ b/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSql.java @@ -27,6 +27,20 @@ public interface DistributedLockSql { */ String getSelectDistributeForUpdateSql(String distributedLockTable); + /** + * Get the select distribute lock sql with NOWAIT semantics for fast-fail + * acquisition. Dialects that do not support NOWAIT should fall back to the + * regular {@link #getSelectDistributeForUpdateSql(String)} so the locker + * keeps the legacy blocking behavior. + * + * @param distributedLockTable the table name of the distribute lock table + * @return the sql with NOWAIT clause when supported, otherwise the regular SELECT FOR UPDATE + * @since 2.5.0 + */ + default String getSelectDistributeForUpdateNoWaitSql(String distributedLockTable) { + return getSelectDistributeForUpdateSql(distributedLockTable); + } + /** * Get insert distribute lock sql * @param distributedLockTable the table name of the distribute lock table diff --git a/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/MysqlDistributedLockSql.java b/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/MysqlDistributedLockSql.java new file mode 100644 index 00000000000..9109dbd5b33 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/MysqlDistributedLockSql.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.store.db.sql.distributed.lock; + +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.constants.ServerTableColumnsName; + +/** + * MySQL specific {@link DistributedLockSql} that supports NOWAIT (since MySQL 8.0). + * + *

The {@code FOR UPDATE NOWAIT} clause makes the database raise an error + * (MySQL error code 3572 / SQLState HY000) instead of blocking when the + * requested row is already locked. Older MySQL releases (5.7 / 5.6) and + * MariaDB do not understand the NOWAIT keyword; for those deployments, + * stick with the default {@link BaseDistributedLockSql} dialect. + * + * @since 2.5.0 + */ +@LoadLevel(name = "mysql") +public class MysqlDistributedLockSql extends BaseDistributedLockSql { + + private static final String SELECT_FOR_UPDATE_NOWAIT_SQL = + "SELECT " + ALL_COLUMNS + " FROM " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD + " WHERE " + + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + " = ? FOR UPDATE NOWAIT"; + + @Override + public String getSelectDistributeForUpdateNoWaitSql(String distributedLockTable) { + return SELECT_FOR_UPDATE_NOWAIT_SQL.replace(DISTRIBUTED_LOCK_TABLE_PLACE_HOLD, distributedLockTable); + } +} diff --git a/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/OracleDistributedLockSql.java b/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/OracleDistributedLockSql.java new file mode 100644 index 00000000000..d672bc54204 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/OracleDistributedLockSql.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.store.db.sql.distributed.lock; + +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.constants.ServerTableColumnsName; + +/** + * Oracle specific {@link DistributedLockSql} that supports NOWAIT. + * + *

When the row is locked by another session, Oracle raises {@code ORA-00054} + * ({@code resource busy and acquire with NOWAIT specified or timeout expired}), + * allowing the locker to fast-fail. + * + * @since 2.5.0 + */ +@LoadLevel(name = "oracle") +public class OracleDistributedLockSql extends BaseDistributedLockSql { + + private static final String SELECT_FOR_UPDATE_NOWAIT_SQL = + "SELECT " + ALL_COLUMNS + " FROM " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD + " WHERE " + + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + " = ? FOR UPDATE NOWAIT"; + + @Override + public String getSelectDistributeForUpdateNoWaitSql(String distributedLockTable) { + return SELECT_FOR_UPDATE_NOWAIT_SQL.replace(DISTRIBUTED_LOCK_TABLE_PLACE_HOLD, distributedLockTable); + } +} diff --git a/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/PostgresqlDistributedLockSql.java b/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/PostgresqlDistributedLockSql.java new file mode 100644 index 00000000000..827e46d870d --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/store/db/sql/distributed/lock/PostgresqlDistributedLockSql.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.store.db.sql.distributed.lock; + +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.constants.ServerTableColumnsName; + +/** + * PostgreSQL specific {@link DistributedLockSql} that supports NOWAIT. + * + *

When the row is locked by another session, PostgreSQL raises a + * {@code lock_not_available} error (SQLSTATE {@code 55P03}) instead of + * blocking, allowing the locker to fast-fail. + * + * @since 2.5.0 + */ +@LoadLevel(name = "postgresql") +public class PostgresqlDistributedLockSql extends BaseDistributedLockSql { + + private static final String SELECT_FOR_UPDATE_NOWAIT_SQL = + "SELECT " + ALL_COLUMNS + " FROM " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD + " WHERE " + + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + " = ? FOR UPDATE NOWAIT"; + + @Override + public String getSelectDistributeForUpdateNoWaitSql(String distributedLockTable) { + return SELECT_FOR_UPDATE_NOWAIT_SQL.replace(DISTRIBUTED_LOCK_TABLE_PLACE_HOLD, distributedLockTable); + } +} diff --git a/core/src/main/resources/META-INF/services/org.apache.seata.core.store.db.sql.distributed.lock.DistributedLockSql b/core/src/main/resources/META-INF/services/org.apache.seata.core.store.db.sql.distributed.lock.DistributedLockSql index edc3d76fc58..9aedb6c0857 100644 --- a/core/src/main/resources/META-INF/services/org.apache.seata.core.store.db.sql.distributed.lock.DistributedLockSql +++ b/core/src/main/resources/META-INF/services/org.apache.seata.core.store.db.sql.distributed.lock.DistributedLockSql @@ -16,3 +16,6 @@ # org.apache.seata.core.store.db.sql.distributed.lock.BaseDistributedLockSql org.apache.seata.core.store.db.sql.distributed.lock.BaseDistributedLockSqlServer +org.apache.seata.core.store.db.sql.distributed.lock.MysqlDistributedLockSql +org.apache.seata.core.store.db.sql.distributed.lock.PostgresqlDistributedLockSql +org.apache.seata.core.store.db.sql.distributed.lock.OracleDistributedLockSql diff --git a/core/src/test/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSqlFactoryTest.java b/core/src/test/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSqlFactoryTest.java index bf84463a3e8..b6b0e28369b 100644 --- a/core/src/test/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSqlFactoryTest.java +++ b/core/src/test/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSqlFactoryTest.java @@ -26,7 +26,21 @@ class DistributedLockSqlFactoryTest { void testGetDistributedLogStoreSqlForMysql() { DistributedLockSql sql = DistributedLockSqlFactory.getDistributedLogStoreSql("mysql"); assertNotNull(sql); - assertTrue(sql instanceof BaseDistributedLockSql); + assertTrue(sql instanceof MysqlDistributedLockSql); + } + + @Test + void testGetDistributedLogStoreSqlForPostgresql() { + DistributedLockSql sql = DistributedLockSqlFactory.getDistributedLogStoreSql("postgresql"); + assertNotNull(sql); + assertTrue(sql instanceof PostgresqlDistributedLockSql); + } + + @Test + void testGetDistributedLogStoreSqlForOracle() { + DistributedLockSql sql = DistributedLockSqlFactory.getDistributedLogStoreSql("oracle"); + assertNotNull(sql); + assertTrue(sql instanceof OracleDistributedLockSql); } @Test diff --git a/core/src/test/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSqlNoWaitTest.java b/core/src/test/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSqlNoWaitTest.java new file mode 100644 index 00000000000..dc1fe5f1f61 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/store/db/sql/distributed/lock/DistributedLockSqlNoWaitTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.store.db.sql.distributed.lock; + +import org.apache.seata.core.constants.ServerTableColumnsName; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for the dialect specific {@link DistributedLockSql} implementations + * that support {@code FOR UPDATE NOWAIT} (issue #7993). + */ +class DistributedLockSqlNoWaitTest { + + private static final String TABLE = "distributed_lock"; + + private static final String SELECT_BASE_SQL = "SELECT " + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + "," + + ServerTableColumnsName.DISTRIBUTED_LOCK_VALUE + "," + ServerTableColumnsName.DISTRIBUTED_LOCK_EXPIRE + + " FROM " + TABLE + " WHERE " + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + " = ?"; + + @Test + void testMysqlDialectAppendsNoWait() { + MysqlDistributedLockSql sql = new MysqlDistributedLockSql(); + assertEquals(SELECT_BASE_SQL + " FOR UPDATE NOWAIT", sql.getSelectDistributeForUpdateNoWaitSql(TABLE)); + // legacy blocking variant must be preserved for backward compatibility + assertEquals(SELECT_BASE_SQL + " FOR UPDATE", sql.getSelectDistributeForUpdateSql(TABLE)); + } + + @Test + void testPostgresqlDialectAppendsNoWait() { + PostgresqlDistributedLockSql sql = new PostgresqlDistributedLockSql(); + assertEquals(SELECT_BASE_SQL + " FOR UPDATE NOWAIT", sql.getSelectDistributeForUpdateNoWaitSql(TABLE)); + assertEquals(SELECT_BASE_SQL + " FOR UPDATE", sql.getSelectDistributeForUpdateSql(TABLE)); + } + + @Test + void testOracleDialectAppendsNoWait() { + OracleDistributedLockSql sql = new OracleDistributedLockSql(); + assertEquals(SELECT_BASE_SQL + " FOR UPDATE NOWAIT", sql.getSelectDistributeForUpdateNoWaitSql(TABLE)); + assertEquals(SELECT_BASE_SQL + " FOR UPDATE", sql.getSelectDistributeForUpdateSql(TABLE)); + } + + @Test + void testBaseDialectFallsBackToBlockingForNoWait() { + // BaseDistributedLockSql does not override the NOWAIT method and + // must fall back to the regular FOR UPDATE statement so dialects + // without NOWAIT support keep their previous semantics. + BaseDistributedLockSql sql = new BaseDistributedLockSql(); + String noWait = sql.getSelectDistributeForUpdateNoWaitSql(TABLE); + assertEquals(sql.getSelectDistributeForUpdateSql(TABLE), noWait); + assertFalse(noWait.toUpperCase().contains("NOWAIT")); + } + + @Test + void testSqlServerDialectFallsBackToBlockingForNoWait() { + // SQL Server uses table hints (WITH (ROWLOCK, UPDLOCK, HOLDLOCK)) + // and does not support NOWAIT in this codepath. The default + // implementation should return the regular hinted SELECT. + BaseDistributedLockSqlServer sql = new BaseDistributedLockSqlServer(); + String noWait = sql.getSelectDistributeForUpdateNoWaitSql(TABLE); + assertEquals(sql.getSelectDistributeForUpdateSql(TABLE), noWait); + assertTrue(noWait.contains("WITH (ROWLOCK, UPDLOCK, HOLDLOCK)")); + } +} diff --git a/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java b/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java index ba480bd5bdc..d240a81e63d 100644 --- a/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java +++ b/server/src/main/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLocker.java @@ -16,6 +16,7 @@ */ package org.apache.seata.server.storage.db.lock; +import org.apache.seata.common.DefaultValues; import org.apache.seata.common.exception.ShouldNeverHappenException; import org.apache.seata.common.loader.EnhancedServiceLoader; import org.apache.seata.common.loader.LoadLevel; @@ -31,6 +32,7 @@ import org.apache.seata.core.store.DistributedLockDO; import org.apache.seata.core.store.DistributedLocker; import org.apache.seata.core.store.db.DataSourceProvider; +import org.apache.seata.core.store.db.sql.distributed.lock.DistributedLockSql; import org.apache.seata.core.store.db.sql.distributed.lock.DistributedLockSqlFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,7 @@ import java.util.Objects; import java.util.Set; +import static org.apache.seata.core.constants.ConfigurationKeys.DISTRIBUTED_LOCK_DB_NOWAIT_ENABLED; import static org.apache.seata.core.constants.ConfigurationKeys.DISTRIBUTED_LOCK_DB_TABLE; /** @@ -64,15 +67,51 @@ public class DataBaseDistributedLocker implements DistributedLocker { private static final int LOCK_WAIT_TIMEOUT_MYSQL_CODE = 1205; + /** + * MySQL 8.0+: ER_LOCK_NOWAIT. + * Statement aborted because lock(s) could not be acquired immediately and NOWAIT is set. + */ + private static final int LOCK_NOWAIT_MYSQL_CODE = 3572; + + /** + * Oracle: ORA-00054 - resource busy and acquire with NOWAIT specified or timeout expired. + * The driver maps the ORA-XXXXX number to the JDBC error code, so we match on 54. + */ + private static final int LOCK_NOWAIT_ORACLE_CODE = 54; + + /** + * PostgreSQL: 55P03 lock_not_available, raised when {@code FOR UPDATE NOWAIT} hits a held row. + */ + private static final String LOCK_NOWAIT_POSTGRESQL_SQLSTATE = "55P03"; + private static final Set IGNORE_MYSQL_CODE = new HashSet<>(); private static final Set IGNORE_MYSQL_MESSAGE = new HashSet<>(); + /** + * Vendor error codes that signal a NOWAIT fast-fail rather than a real failure. + */ + private static final Set IGNORE_NOWAIT_CODE = new HashSet<>(); + + /** + * SQLState values that signal a NOWAIT fast-fail rather than a real failure. + */ + private static final Set IGNORE_NOWAIT_SQLSTATE = new HashSet<>(); + static { IGNORE_MYSQL_CODE.add(LOCK_WAIT_TIMEOUT_MYSQL_CODE); IGNORE_MYSQL_MESSAGE.add(LOCK_WAIT_TIMEOUT_MYSQL_MESSAGE); + IGNORE_NOWAIT_CODE.add(LOCK_NOWAIT_MYSQL_CODE); + IGNORE_NOWAIT_CODE.add(LOCK_NOWAIT_ORACLE_CODE); + IGNORE_NOWAIT_SQLSTATE.add(LOCK_NOWAIT_POSTGRESQL_SQLSTATE); } + /** + * whether NOWAIT acquisition is enabled. Resolved once in the constructor and + * propagated through configuration listener so toggling at runtime is supported. + */ + private volatile boolean nowaitEnabled; + /** * whether the distribute lock demotion * using for 1.5.0 only and will remove in 1.6.0 @@ -89,6 +128,17 @@ public DataBaseDistributedLocker() { distributedLockTable = configuration.getConfig(DISTRIBUTED_LOCK_DB_TABLE); dbType = configuration.getConfig(ConfigurationKeys.STORE_DB_TYPE); datasourceType = configuration.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE); + nowaitEnabled = configuration.getBoolean( + DISTRIBUTED_LOCK_DB_NOWAIT_ENABLED, DefaultValues.DEFAULT_DISTRIBUTED_LOCK_DB_NOWAIT_ENABLED); + configuration.addConfigListener(DISTRIBUTED_LOCK_DB_NOWAIT_ENABLED, new CachedConfigurationChangeListener() { + @Override + public void onChangeEvent(ConfigurationChangeEvent event) { + String newValue = event.getNewValue(); + if (StringUtils.isNotBlank(newValue)) { + nowaitEnabled = Boolean.parseBoolean(newValue); + } + } + }); if (StringUtils.isBlank(distributedLockTable)) { demotion = true; @@ -146,10 +196,17 @@ public boolean acquireLock(DistributedLockDO distributedLockDO) { return ret; } catch (SQLException ex) { - // ignore "Lock wait timeout exceeded; try restarting transaction" - // TODO: need nowait adaptation + // ignore "Lock wait timeout exceeded" (legacy blocking) and the + // NOWAIT fast-fail signals (MySQL 3572 / Oracle ORA-00054 / + // PostgreSQL 55P03). Other SQLExceptions are unexpected and logged. if (!ignoreSQLException(ex)) { LOGGER.error("execute acquire lock failure, key is: {}", distributedLockDO.getLockKey(), ex); + } else if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "fast-fail acquiring distribute lock for key :{} due to contention (errorCode={}, sqlState={})", + distributedLockDO.getLockKey(), + ex.getErrorCode(), + ex.getSQLState()); } try { if (connection != null) { @@ -233,9 +290,11 @@ public boolean releaseLock(DistributedLockDO distributedLockDO) { } protected DistributedLockDO getDistributedLockDO(Connection connection, String key) throws SQLException { - try (PreparedStatement pst = - connection.prepareStatement(DistributedLockSqlFactory.getDistributedLogStoreSql(dbType) - .getSelectDistributeForUpdateSql(distributedLockTable))) { + DistributedLockSql lockSql = DistributedLockSqlFactory.getDistributedLogStoreSql(dbType); + String selectSql = nowaitEnabled + ? lockSql.getSelectDistributeForUpdateNoWaitSql(distributedLockTable) + : lockSql.getSelectDistributeForUpdateSql(distributedLockTable); + try (PreparedStatement pst = connection.prepareStatement(selectSql)) { pst.setString(1, key); ResultSet resultSet = pst.executeQuery(); @@ -287,6 +346,13 @@ private boolean ignoreSQLException(SQLException exception) { if (IGNORE_MYSQL_CODE.contains(exception.getErrorCode())) { return true; } + if (IGNORE_NOWAIT_CODE.contains(exception.getErrorCode())) { + return true; + } + if (StringUtils.isNotBlank(exception.getSQLState()) + && IGNORE_NOWAIT_SQLSTATE.contains(exception.getSQLState())) { + return true; + } if (StringUtils.isNotBlank(exception.getMessage())) { return IGNORE_MYSQL_MESSAGE.stream() .anyMatch(message -> exception.getMessage().contains(message)); diff --git a/server/src/test/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLockerTest.java b/server/src/test/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLockerTest.java index d1706ded9ba..07456ba8ca4 100644 --- a/server/src/test/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLockerTest.java +++ b/server/src/test/java/org/apache/seata/server/storage/db/lock/DataBaseDistributedLockerTest.java @@ -830,6 +830,125 @@ void testReleaseLock_NullConnection_HandlesGracefully() throws Exception { verify(mockConnection, never()).close(); } + @Test + void testIgnoreSQLException_WithMySqlNoWaitErrorCode_ReturnsTrue() throws Exception { + // MySQL 8.0+ ER_LOCK_NOWAIT + SQLException exception = new SQLException( + "Statement aborted because lock(s) could not be acquired " + "immediately and NOWAIT is set", + "HY000", + 3572); + + Method method = DataBaseDistributedLocker.class.getDeclaredMethod("ignoreSQLException", SQLException.class); + method.setAccessible(true); + boolean result = (boolean) method.invoke(locker, exception); + + assertTrue(result, "ignoreSQLException should treat MySQL error code 3572 as fast-fail"); + } + + @Test + void testIgnoreSQLException_WithOracleNoWaitErrorCode_ReturnsTrue() throws Exception { + // ORA-00054 resource busy + SQLException exception = new SQLException( + "ORA-00054: resource busy and acquire with NOWAIT specified or timeout expired", "61000", 54); + + Method method = DataBaseDistributedLocker.class.getDeclaredMethod("ignoreSQLException", SQLException.class); + method.setAccessible(true); + boolean result = (boolean) method.invoke(locker, exception); + + assertTrue(result, "ignoreSQLException should treat Oracle error code 54 as fast-fail"); + } + + @Test + void testIgnoreSQLException_WithPostgresqlNoWaitSqlState_ReturnsTrue() throws Exception { + // PostgreSQL 55P03 lock_not_available - vendor-specific error code is not 0 + // in real life, but we match on SQLState rather than vendor code. + SQLException exception = new SQLException("could not obtain lock on row in relation", "55P03", 0); + + Method method = DataBaseDistributedLocker.class.getDeclaredMethod("ignoreSQLException", SQLException.class); + method.setAccessible(true); + boolean result = (boolean) method.invoke(locker, exception); + + assertTrue(result, "ignoreSQLException should treat PostgreSQL SQLState 55P03 as fast-fail"); + } + + @Test + void testStaticInitializer_PopulatesNoWaitIgnoreSets() throws Exception { + Field codeField = DataBaseDistributedLocker.class.getDeclaredField("IGNORE_NOWAIT_CODE"); + codeField.setAccessible(true); + @SuppressWarnings("unchecked") + Set ignoreNoWaitCode = (Set) codeField.get(null); + + Field stateField = DataBaseDistributedLocker.class.getDeclaredField("IGNORE_NOWAIT_SQLSTATE"); + stateField.setAccessible(true); + @SuppressWarnings("unchecked") + Set ignoreNoWaitSqlState = (Set) stateField.get(null); + + assertTrue(ignoreNoWaitCode.contains(3572), "IGNORE_NOWAIT_CODE should contain 3572 (MySQL)"); + assertTrue(ignoreNoWaitCode.contains(54), "IGNORE_NOWAIT_CODE should contain 54 (Oracle)"); + assertTrue( + ignoreNoWaitSqlState.contains("55P03"), "IGNORE_NOWAIT_SQLSTATE should contain 55P03 (PostgreSQL)"); + } + + @Test + void testAcquireLock_NoWaitEnabled_UsesNoWaitSelectSql() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.getAutoCommit()).thenReturn(true); + when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement); + when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(false); + when(mockPreparedStatement.executeUpdate()).thenReturn(1); + + // Force the locker into nowait mode and switch its dbType so that + // the SPI returns a dialect with a real NOWAIT implementation. + setNowaitEnabled(locker, true); + setDbType(locker, "mysql"); + setDistributedLockTable(locker, "distributed_lock"); + + DistributedLockDO lockDO = new DistributedLockDO(); + lockDO.setLockKey("test-key"); + lockDO.setLockValue("test-value"); + lockDO.setExpireTime(5000L); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + + boolean result = locker.acquireLock(lockDO); + + assertTrue(result); + verify(mockConnection, atLeastOnce()).prepareStatement(sqlCaptor.capture()); + boolean noWaitSqlSeen = sqlCaptor.getAllValues().stream() + .anyMatch(sql -> sql.toUpperCase().contains("FOR UPDATE NOWAIT")); + assertTrue(noWaitSqlSeen, "NOWAIT mode should issue a SELECT ... FOR UPDATE NOWAIT statement"); + } + + @Test + void testAcquireLock_NoWaitDisabled_UsesBlockingSelectSql() throws Exception { + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.getAutoCommit()).thenReturn(true); + when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement); + when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(false); + when(mockPreparedStatement.executeUpdate()).thenReturn(1); + + setNowaitEnabled(locker, false); + setDbType(locker, "mysql"); + setDistributedLockTable(locker, "distributed_lock"); + + DistributedLockDO lockDO = new DistributedLockDO(); + lockDO.setLockKey("test-key"); + lockDO.setLockValue("test-value"); + lockDO.setExpireTime(5000L); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + + boolean result = locker.acquireLock(lockDO); + + assertTrue(result); + verify(mockConnection, atLeastOnce()).prepareStatement(sqlCaptor.capture()); + boolean noWaitSqlSeen = sqlCaptor.getAllValues().stream() + .anyMatch(sql -> sql.toUpperCase().contains("NOWAIT")); + assertFalse(noWaitSqlSeen, "Blocking mode must keep the legacy SELECT ... FOR UPDATE without NOWAIT"); + } + // =========================== // Helper Methods for Reflection // =========================== @@ -846,5 +965,23 @@ private void setDistributedLockDataSource(DataBaseDistributedLocker locker, Data dataSourceField.setAccessible(true); dataSourceField.set(locker, dataSource); } + + private void setNowaitEnabled(DataBaseDistributedLocker locker, boolean value) throws Exception { + Field field = DataBaseDistributedLocker.class.getDeclaredField("nowaitEnabled"); + field.setAccessible(true); + field.set(locker, value); + } + + private void setDbType(DataBaseDistributedLocker locker, String value) throws Exception { + Field field = DataBaseDistributedLocker.class.getDeclaredField("dbType"); + field.setAccessible(true); + field.set(locker, value); + } + + private void setDistributedLockTable(DataBaseDistributedLocker locker, String value) throws Exception { + Field field = DataBaseDistributedLocker.class.getDeclaredField("distributedLockTable"); + field.setAccessible(true); + field.set(locker, value); + } } }