From b77763295242ea47b50744e04f23e2a53d09b3d0 Mon Sep 17 00:00:00 2001 From: xiaoyu Date: Sun, 29 Mar 2026 23:40:07 +0800 Subject: [PATCH 1/5] feature: reverse load balancing --- .../apache/seata/core/rpc/RemotingServer.java | 19 +++ .../org/apache/seata/core/rpc/RpcContext.java | 18 ++ .../netty/AbstractNettyRemotingServer.java | 23 +++ .../seata/core/rpc/netty/ChannelManager.java | 123 ++++++++++++++ .../ServerConsistentHashLoadBalance.java | 157 ++++++++++++++++++ .../ServerLeastActiveLoadBalance.java | 51 ++++++ .../netty/loadbalance/ServerLoadBalance.java | 36 ++++ .../loadbalance/ServerLoadBalanceFactory.java | 96 +++++++++++ .../loadbalance/ServerRandomLoadBalance.java | 35 ++++ .../ServerRoundRobinLoadBalance.java | 47 ++++++ .../loadbalance/ServerXidLoadBalance.java | 68 ++++++++ ...re.rpc.netty.loadbalance.ServerLoadBalance | 5 + .../ServerLoadBalanceBehaviorTest.java | 105 ++++++++++++ .../ServerLoadBalanceFactoryTest.java | 60 +++++++ .../server/coordinator/AbstractCore.java | 14 +- .../coordinator/DefaultCoordinator.java | 3 +- 16 files changed, 857 insertions(+), 3 deletions(-) create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerConsistentHashLoadBalance.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLeastActiveLoadBalance.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalance.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRandomLoadBalance.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRoundRobinLoadBalance.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerXidLoadBalance.java create mode 100644 core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance create mode 100644 core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java create mode 100644 core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/RemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/RemotingServer.java index 73a44bb92db..63eb88753d7 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/RemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/RemotingServer.java @@ -17,6 +17,7 @@ package org.apache.seata.core.rpc; import io.netty.channel.Channel; +import org.apache.seata.core.model.BranchType; import org.apache.seata.core.protocol.MessageType; import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.rpc.processor.RemotingProcessor; @@ -45,6 +46,24 @@ public interface RemotingServer { Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) throws TimeoutException, IOException; + /** + * server send sync request with transaction context. + * + * @param resourceId rm client resourceId + * @param clientId rm client id + * @param msg transaction message {@code org.apache.seata.core.protocol} + * @param tryOtherApp try other app + * @param xid global transaction xid + * @param branchType branch type + * @return client result message + * @throws TimeoutException TimeoutException + */ + default Object sendSyncRequest( + String resourceId, String clientId, Object msg, boolean tryOtherApp, String xid, BranchType branchType) + throws TimeoutException, IOException { + return sendSyncRequest(resourceId, clientId, msg, tryOtherApp); + } + /** * server send sync request. * diff --git a/core/src/main/java/org/apache/seata/core/rpc/RpcContext.java b/core/src/main/java/org/apache/seata/core/rpc/RpcContext.java index 14c20c71b99..97cd342c09a 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/RpcContext.java +++ b/core/src/main/java/org/apache/seata/core/rpc/RpcContext.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -53,6 +54,11 @@ public class RpcContext { private Set resourceSets; + /** + * current active sync requests on this context + */ + private final AtomicInteger activeCount = new AtomicInteger(); + /** * id */ @@ -323,6 +329,18 @@ public void setClientId(String clientId) { this.clientId = clientId; } + public int getActiveCount() { + return activeCount.get(); + } + + public void incrementActiveCount() { + activeCount.incrementAndGet(); + } + + public void decrementActiveCount() { + activeCount.updateAndGet(current -> current > 0 ? current - 1 : 0); + } + @Override public String toString() { return "RpcContext{" + "applicationId='" diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 1bcad8c6f7f..57f5f49775c 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -26,6 +26,7 @@ import io.netty.handler.timeout.IdleStateEvent; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.model.BranchType; import org.apache.seata.core.protocol.AbstractMessage; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.MergedWarpMessage; @@ -78,6 +79,28 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } + @Override + public Object sendSyncRequest( + String resourceId, String clientId, Object msg, boolean tryOtherApp, String xid, BranchType branchType) + throws TimeoutException, IOException { + Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp, xid, branchType); + if (channel == null) { + throw new IOException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); + } + RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); + if (rpcContext != null) { + rpcContext.incrementActiveCount(); + } + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + try { + return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); + } finally { + if (rpcContext != null) { + rpcContext.decrementActiveCount(); + } + } + } + @Override public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException, IOException { if (channel == null) { diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java index fbf23869e6c..b93cf36566c 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java @@ -22,17 +22,22 @@ import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.model.BranchType; import org.apache.seata.core.protocol.IncompatibleVersionException; import org.apache.seata.core.protocol.RegisterRMRequest; import org.apache.seata.core.protocol.RegisterTMRequest; import org.apache.seata.core.rpc.RpcContext; +import org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance; +import org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalanceFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -314,6 +319,98 @@ private static Channel getChannelFromSameClientMap(Map clie * @return Corresponding channel, NULL if not found. */ public static Channel getChannel(String resourceId, String clientId, boolean tryOtherApp) { + return getChannel(resourceId, clientId, tryOtherApp, null, null); + } + + /** + * Gets channel with transaction context. + */ + public static Channel getChannel( + String resourceId, String clientId, boolean tryOtherApp, String xid, BranchType branchType) { + if (branchType != BranchType.AT && branchType != BranchType.TCC) { + return getChannelByPriority(resourceId, clientId, tryOtherApp); + } + + ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(branchType); + if (loadBalance == null) { + return getChannelByPriority(resourceId, clientId, tryOtherApp); + } + + String[] clientIdInfo = readClientId(clientId); + if (clientIdInfo == null || clientIdInfo.length != 3) { + throw new FrameworkException("Invalid Client ID: " + clientId); + } + + if (StringUtils.isBlank(resourceId)) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("No channel is available, resourceId is null or empty"); + } + return null; + } + + String targetApplicationId = clientIdInfo[0]; + ConcurrentMap>> applicationIdMap = + RM_CHANNELS.get(resourceId); + + if (targetApplicationId == null || applicationIdMap == null || applicationIdMap.isEmpty()) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("No channel is available for resource[{}]", resourceId); + } + return null; + } + + List candidates = collectCandidates(applicationIdMap, targetApplicationId, tryOtherApp); + if (CollectionUtils.isEmpty(candidates)) { + return null; + } + if (candidates.size() == 1) { + return candidates.get(0).getChannel(); + } + RpcContext selected = loadBalance.select(candidates, xid); + return selected == null ? null : selected.getChannel(); + } + + private static List collectCandidates( + ConcurrentMap>> applicationIdMap, + String targetApplicationId, + boolean tryOtherApp) { + List candidates = new ArrayList<>(); + ConcurrentMap> ipMap = applicationIdMap.get(targetApplicationId); + collectActiveContexts(ipMap, candidates); + if (tryOtherApp) { + for (ConcurrentMap.Entry>> appEntry : + applicationIdMap.entrySet()) { + if (appEntry.getKey().equals(targetApplicationId)) { + continue; + } + collectActiveContexts(appEntry.getValue(), candidates); + } + } + return candidates; + } + + private static void collectActiveContexts( + ConcurrentMap> ipMap, List candidates) { + if (ipMap == null || ipMap.isEmpty()) { + return; + } + for (ConcurrentMap portMap : ipMap.values()) { + if (portMap == null || portMap.isEmpty()) { + continue; + } + for (ConcurrentMap.Entry entry : portMap.entrySet()) { + RpcContext rpcContext = entry.getValue(); + Channel channel = rpcContext.getChannel(); + if (channel.isActive()) { + candidates.add(rpcContext); + } else if (portMap.remove(entry.getKey(), rpcContext) && LOGGER.isInfoEnabled()) { + LOGGER.info("Removed inactive {}", channel); + } + } + } + } + + private static Channel getChannelByPriority(String resourceId, String clientId, boolean tryOtherApp) { Channel resultChannel = null; String[] clientIdInfo = readClientId(clientId); @@ -509,11 +606,37 @@ private static Channel tryOtherApp( * @return the rm channels,key:resourceId,value:channel */ public static Map getRmChannels() { + return getRmChannels(null); + } + + /** + * get rm channels by branch type. + * + * @param branchType branch type + * @return the rm channels,key:resourceId,value:channel + */ + public static Map getRmChannels(BranchType branchType) { if (RM_CHANNELS.isEmpty()) { return Collections.emptyMap(); } Map channels = new HashMap<>(RM_CHANNELS.size()); + ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(branchType); RM_CHANNELS.forEach((resourceId, value) -> { + if (loadBalance != null && branchType == BranchType.AT) { + List candidates = new ArrayList<>(); + for (ConcurrentMap> ipMap : value.values()) { + collectActiveContexts(ipMap, candidates); + } + if (CollectionUtils.isNotEmpty(candidates)) { + RpcContext selected = candidates.size() == 1 + ? candidates.get(0) + : loadBalance.select(candidates, resourceId); + if (selected != null) { + channels.put(resourceId, selected.getChannel()); + } + } + return; + } Channel channel = tryOtherApp(value, null); if (channel == null) { return; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerConsistentHashLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerConsistentHashLoadBalance.java new file mode 100644 index 00000000000..3ff7aa7902e --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerConsistentHashLoadBalance.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.loadbalance; + +import org.apache.seata.common.DefaultValues; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.rpc.RpcContext; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * Consistent hash load balance for server side. + */ +@LoadLevel(name = "ConsistentHashLoadBalance") +public class ServerConsistentHashLoadBalance implements ServerLoadBalance { + + public static final String LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES = "server.loadBalance.virtualNodes"; + + private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance() + .getInt(LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES, DefaultValues.VIRTUAL_NODES_DEFAULT); + + private volatile ConsistentHashSelectorWrapper selectorWrapper; + + @Override + public RpcContext select(List candidates, String xid) { + if (selectorWrapper == null) { + synchronized (this) { + if (selectorWrapper == null) { + selectorWrapper = + new ConsistentHashSelectorWrapper(new ConsistentHashSelector(candidates, VIRTUAL_NODES_NUM), candidates); + } + } + } + String objectKey = StringUtils.isNotBlank(xid) ? xid : buildStableFallbackKey(candidates); + return selectorWrapper.getSelector(candidates).select(objectKey); + } + + private String buildStableFallbackKey(List candidates) { + return candidates.stream() + .map(RpcContext::getClientId) + .filter(StringUtils::isNotBlank) + .sorted() + .collect(Collectors.joining("|")); + } + + private static final class ConsistentHashSelectorWrapper { + + private volatile ConsistentHashSelector selector; + private volatile Set candidates; + + private ConsistentHashSelectorWrapper(ConsistentHashSelector selector, List candidates) { + this.selector = selector; + this.candidates = new HashSet<>(candidates); + } + + private ConsistentHashSelector getSelector(List newCandidates) { + if (!equals(newCandidates)) { + synchronized (this) { + if (!equals(newCandidates)) { + selector = new ConsistentHashSelector(newCandidates, VIRTUAL_NODES_NUM); + this.candidates = new HashSet<>(newCandidates); + } + } + } + return selector; + } + + private boolean equals(List newCandidates) { + if (newCandidates.size() != this.candidates.size()) { + return false; + } + for (RpcContext candidate : newCandidates) { + if (!this.candidates.contains(candidate)) { + return false; + } + } + return true; + } + } + + private static final class ConsistentHashSelector { + + private final SortedMap virtualInvokers = new TreeMap<>(); + private final HashFunction hashFunction = new SHA256Hash(); + + private ConsistentHashSelector(List candidates, int virtualNodes) { + for (RpcContext candidate : candidates) { + for (int i = 0; i < virtualNodes; i++) { + virtualInvokers.put(hashFunction.hash(candidate.toString() + i), candidate); + } + } + } + + private RpcContext select(String objectKey) { + SortedMap tailMap = virtualInvokers.tailMap(hashFunction.hash(objectKey)); + Long nodeHashVal = tailMap.isEmpty() ? virtualInvokers.firstKey() : tailMap.firstKey(); + return virtualInvokers.get(nodeHashVal); + } + } + + private static class SHA256Hash implements HashFunction { + + private static final ThreadLocal INSTANCE = + ThreadLocal.withInitial(() -> { + try { + return MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException(e.getMessage(), e); + } + }); + + @Override + public long hash(String key) { + MessageDigest instance = INSTANCE.get(); + instance.reset(); + instance.update(key.getBytes(StandardCharsets.UTF_8)); + byte[] digest = instance.digest(key.getBytes(StandardCharsets.UTF_8)); + long hash = 0; + for (int i = 0; i < 8 && i < digest.length; i++) { + hash <<= 8; + hash |= digest[i] & 0xff; + } + return hash; + } + } + + /** + * Hash string to long value. + */ + public interface HashFunction { + long hash(String key); + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLeastActiveLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLeastActiveLoadBalance.java new file mode 100644 index 00000000000..f1f549450fb --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLeastActiveLoadBalance.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.loadbalance; + +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.rpc.RpcContext; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Least active load balance for server side. + */ +@LoadLevel(name = "LeastActiveLoadBalance") +public class ServerLeastActiveLoadBalance implements ServerLoadBalance { + + @Override + public RpcContext select(List candidates, String xid) { + long leastActive = -1; + int leastCount = 0; + int[] leastIndexes = new int[candidates.size()]; + for (int i = 0; i < candidates.size(); i++) { + long active = candidates.get(i).getActiveCount(); + if (leastActive == -1 || active < leastActive) { + leastActive = active; + leastCount = 1; + leastIndexes[0] = i; + } else if (active == leastActive) { + leastIndexes[leastCount++] = i; + } + } + if (leastCount == 1) { + return candidates.get(leastIndexes[0]); + } + return candidates.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalance.java new file mode 100644 index 00000000000..7e37fc71127 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalance.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.loadbalance; + +import org.apache.seata.core.rpc.RpcContext; + +import java.util.List; + +/** + * Server side load balance strategy. + */ +public interface ServerLoadBalance { + + /** + * Select one candidate. + * + * @param candidates candidate rpc contexts + * @param xid global transaction xid, may be null + * @return selected context + */ + RpcContext select(List candidates, String xid); +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java new file mode 100644 index 00000000000..bf1d2e9156e --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.loadbalance; + +import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.model.BranchType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Server side load balance factory. + */ +public final class ServerLoadBalanceFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(ServerLoadBalanceFactory.class); + + public static final String SERVER_LB_PREFIX = "server.loadBalance."; + public static final String SERVER_LB_ENABLED = SERVER_LB_PREFIX + "%s.enabled"; + public static final String SERVER_LB_TYPE = SERVER_LB_PREFIX + "%s.type"; + public static final String DEFAULT_SERVER_AT_LB_TYPE = "RoundRobinLoadBalance"; + private static final AtomicBoolean AT_DEFAULT_ENABLE_LOGGED = new AtomicBoolean(false); + private static final AtomicBoolean AT_DEFAULT_TYPE_LOGGED = new AtomicBoolean(false); + + private ServerLoadBalanceFactory() {} + + /** + * Whether load balance is enabled for given branch type. + * + *

Only AT/TCC are supported currently.

+ */ + public static boolean isEnabled(BranchType branchType) { + if (branchType == BranchType.AT) { + if (ConfigurationFactory.getInstance().getConfig(buildEnabledKey(branchType)) == null + && AT_DEFAULT_ENABLE_LOGGED.compareAndSet(false, true)) { + LOGGER.info( + "Use default server AT load balance switch as enabled, set {} explicitly to avoid implicit behavior", + buildEnabledKey(branchType)); + } + return ConfigurationFactory.getInstance().getBoolean(buildEnabledKey(branchType), true); + } + if (branchType == BranchType.TCC) { + return ConfigurationFactory.getInstance().getBoolean(buildEnabledKey(branchType), false); + } + return false; + } + + /** + * Get load balance strategy for branch type. + */ + public static ServerLoadBalance getInstance(BranchType branchType) { + if (!isEnabled(branchType)) { + return null; + } + String configType = ConfigurationFactory.getInstance().getConfig(buildTypeKey(branchType)); + String type = configType; + if (branchType == BranchType.AT && StringUtils.isBlank(type)) { + if (AT_DEFAULT_TYPE_LOGGED.compareAndSet(false, true)) { + LOGGER.info( + "Use default server AT load balance type [{}], set {} explicitly for server side", + DEFAULT_SERVER_AT_LB_TYPE, + buildTypeKey(branchType)); + } + type = DEFAULT_SERVER_AT_LB_TYPE; + } + if (branchType == BranchType.TCC && StringUtils.isBlank(type)) { + return null; + } + return EnhancedServiceLoader.load(ServerLoadBalance.class, type); + } + + private static String buildEnabledKey(BranchType branchType) { + return String.format(SERVER_LB_ENABLED, branchType.name().toLowerCase()); + } + + private static String buildTypeKey(BranchType branchType) { + return String.format(SERVER_LB_TYPE, branchType.name().toLowerCase()); + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRandomLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRandomLoadBalance.java new file mode 100644 index 00000000000..139fa52c8fd --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRandomLoadBalance.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.loadbalance; + +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.rpc.RpcContext; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Random load balance for server side. + */ +@LoadLevel(name = "RandomLoadBalance") +public class ServerRandomLoadBalance implements ServerLoadBalance { + + @Override + public RpcContext select(List candidates, String xid) { + return candidates.get(ThreadLocalRandom.current().nextInt(candidates.size())); + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRoundRobinLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRoundRobinLoadBalance.java new file mode 100644 index 00000000000..89a09a21b50 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRoundRobinLoadBalance.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.loadbalance; + +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.rpc.RpcContext; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Round robin load balance for server side. + */ +@LoadLevel(name = "RoundRobinLoadBalance") +public class ServerRoundRobinLoadBalance implements ServerLoadBalance { + + private final AtomicInteger sequence = new AtomicInteger(); + + @Override + public RpcContext select(List candidates, String xid) { + return candidates.get(getPositiveSequence() % candidates.size()); + } + + private int getPositiveSequence() { + for (; ; ) { + int current = sequence.get(); + int next = current >= Integer.MAX_VALUE ? 0 : current + 1; + if (sequence.compareAndSet(current, next)) { + return current; + } + } + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerXidLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerXidLoadBalance.java new file mode 100644 index 00000000000..4c80b6af680 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerXidLoadBalance.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.loadbalance; + +import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.rpc.netty.ChannelUtil; +import org.apache.seata.core.rpc.RpcContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Objects; + +/** + * XID load balance for server side. + */ +@LoadLevel(name = "XID") +public class ServerXidLoadBalance implements ServerLoadBalance { + + private static final Logger LOGGER = LoggerFactory.getLogger(ServerXidLoadBalance.class); + private static final String SPLIT = ":"; + private static final ServerLoadBalance RANDOM_LOAD_BALANCE = + EnhancedServiceLoader.load(ServerLoadBalance.class, "RandomLoadBalance"); + + @Override + public RpcContext select(List candidates, String xid) { + if (StringUtils.isNotBlank(xid)) { + String targetAddress = extractAddressFromXid(xid); + if (StringUtils.isNotBlank(targetAddress)) { + for (RpcContext candidate : candidates) { + if (candidate == null || candidate.getChannel() == null) { + continue; + } + String candidateAddress = ChannelUtil.getAddressFromChannel(candidate.getChannel()); + if (Objects.equals(targetAddress, candidateAddress)) { + return candidate; + } + } + } + LOGGER.warn("not found target rm channel by xid: {}, fallback to random", xid); + } + return RANDOM_LOAD_BALANCE.select(candidates, xid); + } + + private String extractAddressFromXid(String xid) { + int lastSplitIndex = xid.lastIndexOf(SPLIT); + if (lastSplitIndex <= 0) { + return null; + } + return xid.substring(0, lastSplitIndex); + } +} diff --git a/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance b/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance new file mode 100644 index 00000000000..10a45cddf87 --- /dev/null +++ b/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance @@ -0,0 +1,5 @@ +org.apache.seata.core.rpc.netty.loadbalance.ServerRandomLoadBalance +org.apache.seata.core.rpc.netty.loadbalance.ServerXidLoadBalance +org.apache.seata.core.rpc.netty.loadbalance.ServerRoundRobinLoadBalance +org.apache.seata.core.rpc.netty.loadbalance.ServerLeastActiveLoadBalance +org.apache.seata.core.rpc.netty.loadbalance.ServerConsistentHashLoadBalance diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java new file mode 100644 index 00000000000..33bbb94a7a4 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.loadbalance; + +import io.netty.channel.Channel; +import org.apache.seata.core.rpc.RpcContext; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test for server load balance behavior. + */ +public class ServerLoadBalanceBehaviorTest { + + @Test + public void testServerXidLoadBalanceShouldPreferExactAddressFromXid() { + RpcContext target = buildRpcContext("app:10.10.10.10:8091", "10.10.10.10", 8091); + RpcContext other = buildRpcContext("app:10.10.10.11:8091", "10.10.10.11", 8091); + List candidates = Arrays.asList(other, target); + + ServerXidLoadBalance loadBalance = new ServerXidLoadBalance(); + RpcContext selected = loadBalance.select(candidates, "10.10.10.10:8091:123456"); + + Assertions.assertSame(target, selected); + } + + @Test + public void testConsistentHashFallbackKeyShouldBeOrderStable() { + RpcContext c1 = buildRpcContext("app:10.10.10.1:8091", "10.10.10.1", 8091); + RpcContext c2 = buildRpcContext("app:10.10.10.2:8091", "10.10.10.2", 8091); + List firstOrder = Arrays.asList(c1, c2); + List secondOrder = Arrays.asList(c2, c1); + + ServerConsistentHashLoadBalance lb1 = new ServerConsistentHashLoadBalance(); + ServerConsistentHashLoadBalance lb2 = new ServerConsistentHashLoadBalance(); + + RpcContext result1 = lb1.select(firstOrder, null); + RpcContext result2 = lb2.select(secondOrder, null); + + Assertions.assertEquals(result1.getClientId(), result2.getClientId()); + } + + @Test + public void testConsistentHashShouldBeThreadSafeForConcurrentSelect() throws ExecutionException, InterruptedException { + RpcContext c1 = buildRpcContext("app:10.10.10.1:8091", "10.10.10.1", 8091); + RpcContext c2 = buildRpcContext("app:10.10.10.2:8091", "10.10.10.2", 8091); + List candidates = Collections.unmodifiableList(Arrays.asList(c1, c2)); + + ServerConsistentHashLoadBalance loadBalance = new ServerConsistentHashLoadBalance(); + ExecutorService executor = Executors.newFixedThreadPool(8); + try { + List> futures = new ArrayList<>(32); + for (int i = 0; i < 32; i++) { + futures.add(CompletableFuture.supplyAsync( + () -> loadBalance.select(candidates, "xid-test-1").getClientId(), executor)); + } + String expected = futures.get(0).get(); + for (CompletableFuture future : futures) { + Assertions.assertEquals(expected, future.get()); + } + } finally { + executor.shutdownNow(); + executor.awaitTermination(1, TimeUnit.SECONDS); + } + } + + private RpcContext buildRpcContext(String clientId, String ip, int port) { + Channel channel = mock(Channel.class); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(ip, port)); + when(channel.isActive()).thenReturn(true); + + RpcContext rpcContext = new RpcContext(); + rpcContext.setClientId(clientId); + rpcContext.setChannel(channel); + return rpcContext; + } +} diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java new file mode 100644 index 00000000000..fa9c6c4a126 --- /dev/null +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.loadbalance; + +import org.apache.seata.core.model.BranchType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Test for ServerLoadBalanceFactory. + */ +public class ServerLoadBalanceFactoryTest { + + @AfterEach + public void cleanUp() { + System.clearProperty("seata.server.loadBalance.at.enabled"); + System.clearProperty("seata.server.loadBalance.at.type"); + System.clearProperty("seata.server.loadBalance.tcc.enabled"); + System.clearProperty("seata.server.loadBalance.tcc.type"); + } + + @Test + public void testAtDefaultEnabledAndUseRoundRobinType() { + ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.AT); + Assertions.assertNotNull(loadBalance); + Assertions.assertTrue(ServerLoadBalanceFactory.isEnabled(BranchType.AT)); + Assertions.assertTrue(loadBalance instanceof ServerRoundRobinLoadBalance); + } + + @Test + public void testTccNeedExplicitTypeWhenEnabled() { + System.setProperty("seata.server.loadBalance.tcc.enabled", "true"); + ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.TCC); + Assertions.assertNull(loadBalance); + } + + @Test + public void testTccEnabledWithType() { + System.setProperty("seata.server.loadBalance.tcc.enabled", "true"); + System.setProperty("seata.server.loadBalance.tcc.type", "RandomLoadBalance"); + ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.TCC); + Assertions.assertNotNull(loadBalance); + Assertions.assertTrue(loadBalance instanceof ServerRandomLoadBalance); + } +} diff --git a/server/src/main/java/org/apache/seata/server/coordinator/AbstractCore.java b/server/src/main/java/org/apache/seata/server/coordinator/AbstractCore.java index 60b63a01e7e..5ed2c375382 100644 --- a/server/src/main/java/org/apache/seata/server/coordinator/AbstractCore.java +++ b/server/src/main/java/org/apache/seata/server/coordinator/AbstractCore.java @@ -218,7 +218,12 @@ protected BranchStatus branchCommitSend( throws IOException, TimeoutException { BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest( - branchSession.getResourceId(), branchSession.getClientId(), request, branchSession.isAT()); + branchSession.getResourceId(), + branchSession.getClientId(), + request, + branchSession.isAT(), + branchSession.getXid(), + branchSession.getBranchType()); return response.getBranchStatus(); } @@ -248,7 +253,12 @@ protected BranchStatus branchRollbackSend( throws IOException, TimeoutException { BranchRollbackResponse response = (BranchRollbackResponse) remotingServer.sendSyncRequest( - branchSession.getResourceId(), branchSession.getClientId(), request, branchSession.isAT()); + branchSession.getResourceId(), + branchSession.getClientId(), + request, + branchSession.isAT(), + branchSession.getXid(), + branchSession.getBranchType()); return response.getBranchStatus(); } diff --git a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java index 4923f3d0512..2fb1eabcc8b 100644 --- a/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java +++ b/server/src/main/java/org/apache/seata/server/coordinator/DefaultCoordinator.java @@ -26,6 +26,7 @@ import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.core.context.RootContext; import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.BranchType; import org.apache.seata.core.model.GlobalStatus; import org.apache.seata.core.protocol.AbstractMessage; import org.apache.seata.core.protocol.AbstractResultMessage; @@ -546,7 +547,7 @@ protected void handleAsyncCommitting() { * Undo log delete. */ protected void undoLogDelete() { - Map rmChannels = ChannelManager.getRmChannels(); + Map rmChannels = ChannelManager.getRmChannels(BranchType.AT); if (rmChannels == null || rmChannels.isEmpty()) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("no active rm channels to delete undo log"); From ef85c52b72c2a60b83709095993c838aec78a687 Mon Sep 17 00:00:00 2001 From: xiaoyu Date: Wed, 1 Apr 2026 10:37:46 +0800 Subject: [PATCH 2/5] optimize: remove xid balance --- .../loadbalance/ServerLoadBalanceFactory.java | 12 ++++ .../loadbalance/ServerXidLoadBalance.java | 68 ------------------- ...re.rpc.netty.loadbalance.ServerLoadBalance | 1 - .../ServerLoadBalanceBehaviorTest.java | 12 ---- .../ServerLoadBalanceFactoryTest.java | 16 +++++ 5 files changed, 28 insertions(+), 81 deletions(-) delete mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerXidLoadBalance.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java index bf1d2e9156e..b08191dc524 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java @@ -36,6 +36,7 @@ public final class ServerLoadBalanceFactory { public static final String SERVER_LB_ENABLED = SERVER_LB_PREFIX + "%s.enabled"; public static final String SERVER_LB_TYPE = SERVER_LB_PREFIX + "%s.type"; public static final String DEFAULT_SERVER_AT_LB_TYPE = "RoundRobinLoadBalance"; + public static final String XID_SERVER_LB_TYPE = "XID"; private static final AtomicBoolean AT_DEFAULT_ENABLE_LOGGED = new AtomicBoolean(false); private static final AtomicBoolean AT_DEFAULT_TYPE_LOGGED = new AtomicBoolean(false); @@ -80,6 +81,17 @@ public static ServerLoadBalance getInstance(BranchType branchType) { } type = DEFAULT_SERVER_AT_LB_TYPE; } + if (StringUtils.equalsIgnoreCase(type, XID_SERVER_LB_TYPE)) { + if (branchType == BranchType.AT) { + LOGGER.warn( + "Server side XID load balance is removed for AT branch type, fallback to [{}]", + DEFAULT_SERVER_AT_LB_TYPE); + type = DEFAULT_SERVER_AT_LB_TYPE; + } else { + LOGGER.warn("Server side XID load balance is removed for {} branch type", branchType); + return null; + } + } if (branchType == BranchType.TCC && StringUtils.isBlank(type)) { return null; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerXidLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerXidLoadBalance.java deleted file mode 100644 index 4c80b6af680..00000000000 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerXidLoadBalance.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.core.rpc.netty.loadbalance; - -import org.apache.seata.common.loader.EnhancedServiceLoader; -import org.apache.seata.common.loader.LoadLevel; -import org.apache.seata.common.util.StringUtils; -import org.apache.seata.core.rpc.netty.ChannelUtil; -import org.apache.seata.core.rpc.RpcContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Objects; - -/** - * XID load balance for server side. - */ -@LoadLevel(name = "XID") -public class ServerXidLoadBalance implements ServerLoadBalance { - - private static final Logger LOGGER = LoggerFactory.getLogger(ServerXidLoadBalance.class); - private static final String SPLIT = ":"; - private static final ServerLoadBalance RANDOM_LOAD_BALANCE = - EnhancedServiceLoader.load(ServerLoadBalance.class, "RandomLoadBalance"); - - @Override - public RpcContext select(List candidates, String xid) { - if (StringUtils.isNotBlank(xid)) { - String targetAddress = extractAddressFromXid(xid); - if (StringUtils.isNotBlank(targetAddress)) { - for (RpcContext candidate : candidates) { - if (candidate == null || candidate.getChannel() == null) { - continue; - } - String candidateAddress = ChannelUtil.getAddressFromChannel(candidate.getChannel()); - if (Objects.equals(targetAddress, candidateAddress)) { - return candidate; - } - } - } - LOGGER.warn("not found target rm channel by xid: {}, fallback to random", xid); - } - return RANDOM_LOAD_BALANCE.select(candidates, xid); - } - - private String extractAddressFromXid(String xid) { - int lastSplitIndex = xid.lastIndexOf(SPLIT); - if (lastSplitIndex <= 0) { - return null; - } - return xid.substring(0, lastSplitIndex); - } -} diff --git a/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance b/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance index 10a45cddf87..fd582a920c3 100644 --- a/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance +++ b/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance @@ -1,5 +1,4 @@ org.apache.seata.core.rpc.netty.loadbalance.ServerRandomLoadBalance -org.apache.seata.core.rpc.netty.loadbalance.ServerXidLoadBalance org.apache.seata.core.rpc.netty.loadbalance.ServerRoundRobinLoadBalance org.apache.seata.core.rpc.netty.loadbalance.ServerLeastActiveLoadBalance org.apache.seata.core.rpc.netty.loadbalance.ServerConsistentHashLoadBalance diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java index 33bbb94a7a4..a4b6d01e926 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java @@ -40,18 +40,6 @@ */ public class ServerLoadBalanceBehaviorTest { - @Test - public void testServerXidLoadBalanceShouldPreferExactAddressFromXid() { - RpcContext target = buildRpcContext("app:10.10.10.10:8091", "10.10.10.10", 8091); - RpcContext other = buildRpcContext("app:10.10.10.11:8091", "10.10.10.11", 8091); - List candidates = Arrays.asList(other, target); - - ServerXidLoadBalance loadBalance = new ServerXidLoadBalance(); - RpcContext selected = loadBalance.select(candidates, "10.10.10.10:8091:123456"); - - Assertions.assertSame(target, selected); - } - @Test public void testConsistentHashFallbackKeyShouldBeOrderStable() { RpcContext c1 = buildRpcContext("app:10.10.10.1:8091", "10.10.10.1", 8091); diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java index fa9c6c4a126..ab3dde99f69 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java @@ -57,4 +57,20 @@ public void testTccEnabledWithType() { Assertions.assertNotNull(loadBalance); Assertions.assertTrue(loadBalance instanceof ServerRandomLoadBalance); } + + @Test + public void testAtConfiguredXidShouldFallbackToRoundRobin() { + System.setProperty("seata.server.loadBalance.at.type", "XID"); + ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.AT); + Assertions.assertNotNull(loadBalance); + Assertions.assertTrue(loadBalance instanceof ServerRoundRobinLoadBalance); + } + + @Test + public void testTccConfiguredXidShouldReturnNull() { + System.setProperty("seata.server.loadBalance.tcc.enabled", "true"); + System.setProperty("seata.server.loadBalance.tcc.type", "XID"); + ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.TCC); + Assertions.assertNull(loadBalance); + } } From 49fe2c0475bcf64958fa5c0f2cdd0811964c2c1b Mon Sep 17 00:00:00 2001 From: xiaoyu Date: Fri, 15 May 2026 18:22:56 +0800 Subject: [PATCH 3/5] feature: reverse load balancing --- .../org/apache/seata/core/rpc/RpcContext.java | 2 +- .../seata/core/rpc/netty/ChannelManager.java | 41 +++-- .../ServerConsistentHashLoadBalance.java | 157 ------------------ .../ServerLeastActiveLoadBalance.java | 2 +- .../netty/loadbalance/ServerLoadBalance.java | 3 +- .../loadbalance/ServerLoadBalanceFactory.java | 96 +++++------ .../loadbalance/ServerRandomLoadBalance.java | 2 +- .../ServerRoundRobinLoadBalance.java | 4 +- ...re.rpc.netty.loadbalance.ServerLoadBalance | 1 - .../ServerLoadBalanceBehaviorTest.java | 101 +++++++---- .../ServerLoadBalanceFactoryTest.java | 64 ++++--- 11 files changed, 181 insertions(+), 292 deletions(-) delete mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerConsistentHashLoadBalance.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/RpcContext.java b/core/src/main/java/org/apache/seata/core/rpc/RpcContext.java index 97cd342c09a..c4f6a91ea59 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/RpcContext.java +++ b/core/src/main/java/org/apache/seata/core/rpc/RpcContext.java @@ -28,9 +28,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; /** * The type rpc context. diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java index b93cf36566c..e1345bbcb2c 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java @@ -323,14 +323,29 @@ public static Channel getChannel(String resourceId, String clientId, boolean try } /** - * Gets channel with transaction context. + * Gets channel with transaction context for server-side load balancing. + * + *

Only AT and TCC branch types support server-side load balancing. + * XA is excluded because its second-phase operations are bound to the local database connection + * of the original RM. SAGA is excluded because its state machine execution context is held + * in memory with no distributed lock protection. For XA/SAGA and unconfigured AT/TCC, + * the original priority-based channel selection logic is used. + * + * @param resourceId Resource ID + * @param clientId Client ID - ApplicationId:IP:Port + * @param tryOtherApp try other app + * @param xid global transaction xid, used for load balancing affinity + * @param branchType branch type, determines whether and which LB algorithm to use + * @return Corresponding channel, NULL if not found. */ public static Channel getChannel( String resourceId, String clientId, boolean tryOtherApp, String xid, BranchType branchType) { + // XA and SAGA do not support server-side load balancing, use original priority-based logic if (branchType != BranchType.AT && branchType != BranchType.TCC) { return getChannelByPriority(resourceId, clientId, tryOtherApp); } + // If no load balance algorithm is configured for this branch type, use original logic ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(branchType); if (loadBalance == null) { return getChannelByPriority(resourceId, clientId, tryOtherApp); @@ -366,7 +381,7 @@ public static Channel getChannel( if (candidates.size() == 1) { return candidates.get(0).getChannel(); } - RpcContext selected = loadBalance.select(candidates, xid); + RpcContext selected = loadBalance.select(candidates); return selected == null ? null : selected.getChannel(); } @@ -612,7 +627,10 @@ public static Map getRmChannels() { /** * get rm channels by branch type. * - * @param branchType branch type + * Only AT and TCC branch types support server-side load balancing. + * For other branch types or when no LB algorithm is configured, the original logic is used. + * + * @param branchType branch type, determines whether and which LB algorithm to use * @return the rm channels,key:resourceId,value:channel */ public static Map getRmChannels(BranchType branchType) { @@ -622,26 +640,25 @@ public static Map getRmChannels(BranchType branchType) { Map channels = new HashMap<>(RM_CHANNELS.size()); ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(branchType); RM_CHANNELS.forEach((resourceId, value) -> { - if (loadBalance != null && branchType == BranchType.AT) { + Channel channel = null; + if (loadBalance != null) { List candidates = new ArrayList<>(); for (ConcurrentMap> ipMap : value.values()) { collectActiveContexts(ipMap, candidates); } if (CollectionUtils.isNotEmpty(candidates)) { - RpcContext selected = candidates.size() == 1 - ? candidates.get(0) - : loadBalance.select(candidates, resourceId); + RpcContext selected = candidates.size() == 1 ? candidates.get(0) : loadBalance.select(candidates); if (selected != null) { - channels.put(resourceId, selected.getChannel()); + channel = selected.getChannel(); } } - return; } - Channel channel = tryOtherApp(value, null); if (channel == null) { - return; + channel = tryOtherApp(value, null); + } + if (channel != null) { + channels.put(resourceId, channel); } - channels.put(resourceId, channel); }); return channels; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerConsistentHashLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerConsistentHashLoadBalance.java deleted file mode 100644 index 3ff7aa7902e..00000000000 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerConsistentHashLoadBalance.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.core.rpc.netty.loadbalance; - -import org.apache.seata.common.DefaultValues; -import org.apache.seata.common.loader.LoadLevel; -import org.apache.seata.common.util.StringUtils; -import org.apache.seata.config.ConfigurationFactory; -import org.apache.seata.core.rpc.RpcContext; - -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.stream.Collectors; - -/** - * Consistent hash load balance for server side. - */ -@LoadLevel(name = "ConsistentHashLoadBalance") -public class ServerConsistentHashLoadBalance implements ServerLoadBalance { - - public static final String LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES = "server.loadBalance.virtualNodes"; - - private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance() - .getInt(LOAD_BALANCE_CONSISTENT_HASH_VIRTUAL_NODES, DefaultValues.VIRTUAL_NODES_DEFAULT); - - private volatile ConsistentHashSelectorWrapper selectorWrapper; - - @Override - public RpcContext select(List candidates, String xid) { - if (selectorWrapper == null) { - synchronized (this) { - if (selectorWrapper == null) { - selectorWrapper = - new ConsistentHashSelectorWrapper(new ConsistentHashSelector(candidates, VIRTUAL_NODES_NUM), candidates); - } - } - } - String objectKey = StringUtils.isNotBlank(xid) ? xid : buildStableFallbackKey(candidates); - return selectorWrapper.getSelector(candidates).select(objectKey); - } - - private String buildStableFallbackKey(List candidates) { - return candidates.stream() - .map(RpcContext::getClientId) - .filter(StringUtils::isNotBlank) - .sorted() - .collect(Collectors.joining("|")); - } - - private static final class ConsistentHashSelectorWrapper { - - private volatile ConsistentHashSelector selector; - private volatile Set candidates; - - private ConsistentHashSelectorWrapper(ConsistentHashSelector selector, List candidates) { - this.selector = selector; - this.candidates = new HashSet<>(candidates); - } - - private ConsistentHashSelector getSelector(List newCandidates) { - if (!equals(newCandidates)) { - synchronized (this) { - if (!equals(newCandidates)) { - selector = new ConsistentHashSelector(newCandidates, VIRTUAL_NODES_NUM); - this.candidates = new HashSet<>(newCandidates); - } - } - } - return selector; - } - - private boolean equals(List newCandidates) { - if (newCandidates.size() != this.candidates.size()) { - return false; - } - for (RpcContext candidate : newCandidates) { - if (!this.candidates.contains(candidate)) { - return false; - } - } - return true; - } - } - - private static final class ConsistentHashSelector { - - private final SortedMap virtualInvokers = new TreeMap<>(); - private final HashFunction hashFunction = new SHA256Hash(); - - private ConsistentHashSelector(List candidates, int virtualNodes) { - for (RpcContext candidate : candidates) { - for (int i = 0; i < virtualNodes; i++) { - virtualInvokers.put(hashFunction.hash(candidate.toString() + i), candidate); - } - } - } - - private RpcContext select(String objectKey) { - SortedMap tailMap = virtualInvokers.tailMap(hashFunction.hash(objectKey)); - Long nodeHashVal = tailMap.isEmpty() ? virtualInvokers.firstKey() : tailMap.firstKey(); - return virtualInvokers.get(nodeHashVal); - } - } - - private static class SHA256Hash implements HashFunction { - - private static final ThreadLocal INSTANCE = - ThreadLocal.withInitial(() -> { - try { - return MessageDigest.getInstance("SHA-256"); - } catch (NoSuchAlgorithmException e) { - throw new IllegalStateException(e.getMessage(), e); - } - }); - - @Override - public long hash(String key) { - MessageDigest instance = INSTANCE.get(); - instance.reset(); - instance.update(key.getBytes(StandardCharsets.UTF_8)); - byte[] digest = instance.digest(key.getBytes(StandardCharsets.UTF_8)); - long hash = 0; - for (int i = 0; i < 8 && i < digest.length; i++) { - hash <<= 8; - hash |= digest[i] & 0xff; - } - return hash; - } - } - - /** - * Hash string to long value. - */ - public interface HashFunction { - long hash(String key); - } -} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLeastActiveLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLeastActiveLoadBalance.java index f1f549450fb..901fcb2fcd0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLeastActiveLoadBalance.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLeastActiveLoadBalance.java @@ -29,7 +29,7 @@ public class ServerLeastActiveLoadBalance implements ServerLoadBalance { @Override - public RpcContext select(List candidates, String xid) { + public RpcContext select(List candidates) { long leastActive = -1; int leastCount = 0; int[] leastIndexes = new int[candidates.size()]; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalance.java index 7e37fc71127..9e595826d41 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalance.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalance.java @@ -29,8 +29,7 @@ public interface ServerLoadBalance { * Select one candidate. * * @param candidates candidate rpc contexts - * @param xid global transaction xid, may be null * @return selected context */ - RpcContext select(List candidates, String xid); + RpcContext select(List candidates); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java index b08191dc524..4f3a118f725 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactory.java @@ -23,86 +23,68 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.atomic.AtomicBoolean; - /** * Server side load balance factory. + * Only AT and TCC branch types support server-side load balancing. + * XA and SAGA are explicitly excluded because: + * XA: second-phase operations are bound to the local database connection of the original RM + * SAGA: state machine execution context is held in memory with no distributed lock protection */ public final class ServerLoadBalanceFactory { private static final Logger LOGGER = LoggerFactory.getLogger(ServerLoadBalanceFactory.class); public static final String SERVER_LB_PREFIX = "server.loadBalance."; - public static final String SERVER_LB_ENABLED = SERVER_LB_PREFIX + "%s.enabled"; - public static final String SERVER_LB_TYPE = SERVER_LB_PREFIX + "%s.type"; - public static final String DEFAULT_SERVER_AT_LB_TYPE = "RoundRobinLoadBalance"; - public static final String XID_SERVER_LB_TYPE = "XID"; - private static final AtomicBoolean AT_DEFAULT_ENABLE_LOGGED = new AtomicBoolean(false); - private static final AtomicBoolean AT_DEFAULT_TYPE_LOGGED = new AtomicBoolean(false); - private ServerLoadBalanceFactory() {} + /** + * Configuration key for AT mode load balance type. + */ + public static final String SERVER_LB_AT_TYPE = SERVER_LB_PREFIX + "at.type"; /** - * Whether load balance is enabled for given branch type. - * - *

Only AT/TCC are supported currently.

+ * Configuration key for TCC mode load balance type. */ - public static boolean isEnabled(BranchType branchType) { - if (branchType == BranchType.AT) { - if (ConfigurationFactory.getInstance().getConfig(buildEnabledKey(branchType)) == null - && AT_DEFAULT_ENABLE_LOGGED.compareAndSet(false, true)) { - LOGGER.info( - "Use default server AT load balance switch as enabled, set {} explicitly to avoid implicit behavior", - buildEnabledKey(branchType)); - } - return ConfigurationFactory.getInstance().getBoolean(buildEnabledKey(branchType), true); - } - if (branchType == BranchType.TCC) { - return ConfigurationFactory.getInstance().getBoolean(buildEnabledKey(branchType), false); - } - return false; - } + public static final String SERVER_LB_TCC_TYPE = SERVER_LB_PREFIX + "tcc.type"; + + private ServerLoadBalanceFactory() {} /** - * Get load balance strategy for branch type. + * Get load balance strategy for the given branch type. + * + * Only AT and TCC are supported. For XA/SAGA and other types, returns null + * to indicate that the original channel selection logic should be used. + * + * If the type configuration is not set or is blank, returns null to indicate + * that the original channel selection logic should be used. + * + * @param branchType the branch type + * @return the load balance instance, or null if load balancing is not configured/applicable */ public static ServerLoadBalance getInstance(BranchType branchType) { - if (!isEnabled(branchType)) { + if (branchType != BranchType.AT && branchType != BranchType.TCC) { return null; } - String configType = ConfigurationFactory.getInstance().getConfig(buildTypeKey(branchType)); - String type = configType; - if (branchType == BranchType.AT && StringUtils.isBlank(type)) { - if (AT_DEFAULT_TYPE_LOGGED.compareAndSet(false, true)) { - LOGGER.info( - "Use default server AT load balance type [{}], set {} explicitly for server side", - DEFAULT_SERVER_AT_LB_TYPE, - buildTypeKey(branchType)); - } - type = DEFAULT_SERVER_AT_LB_TYPE; - } - if (StringUtils.equalsIgnoreCase(type, XID_SERVER_LB_TYPE)) { - if (branchType == BranchType.AT) { - LOGGER.warn( - "Server side XID load balance is removed for AT branch type, fallback to [{}]", - DEFAULT_SERVER_AT_LB_TYPE); - type = DEFAULT_SERVER_AT_LB_TYPE; - } else { - LOGGER.warn("Server side XID load balance is removed for {} branch type", branchType); - return null; - } - } - if (branchType == BranchType.TCC && StringUtils.isBlank(type)) { + + String typeKey = buildTypeKey(branchType); + String type = ConfigurationFactory.getInstance().getConfig(typeKey); + + if (StringUtils.isBlank(type)) { return null; } - return EnhancedServiceLoader.load(ServerLoadBalance.class, type); - } - private static String buildEnabledKey(BranchType branchType) { - return String.format(SERVER_LB_ENABLED, branchType.name().toLowerCase()); + try { + return EnhancedServiceLoader.load(ServerLoadBalance.class, type); + } catch (Exception e) { + LOGGER.error( + "Failed to load server load balance [{}] for branch type [{}], fallback to original logic", + type, + branchType, + e); + return null; + } } private static String buildTypeKey(BranchType branchType) { - return String.format(SERVER_LB_TYPE, branchType.name().toLowerCase()); + return SERVER_LB_PREFIX + branchType.name().toLowerCase() + ".type"; } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRandomLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRandomLoadBalance.java index 139fa52c8fd..98fbeab8d44 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRandomLoadBalance.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRandomLoadBalance.java @@ -29,7 +29,7 @@ public class ServerRandomLoadBalance implements ServerLoadBalance { @Override - public RpcContext select(List candidates, String xid) { + public RpcContext select(List candidates) { return candidates.get(ThreadLocalRandom.current().nextInt(candidates.size())); } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRoundRobinLoadBalance.java b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRoundRobinLoadBalance.java index 89a09a21b50..a25a4f1396f 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRoundRobinLoadBalance.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/loadbalance/ServerRoundRobinLoadBalance.java @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** - * Round robin load balance for server side. + * Round-robin load balance for server side. */ @LoadLevel(name = "RoundRobinLoadBalance") public class ServerRoundRobinLoadBalance implements ServerLoadBalance { @@ -31,7 +31,7 @@ public class ServerRoundRobinLoadBalance implements ServerLoadBalance { private final AtomicInteger sequence = new AtomicInteger(); @Override - public RpcContext select(List candidates, String xid) { + public RpcContext select(List candidates) { return candidates.get(getPositiveSequence() % candidates.size()); } diff --git a/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance b/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance index fd582a920c3..e53955e3159 100644 --- a/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance +++ b/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance @@ -1,4 +1,3 @@ org.apache.seata.core.rpc.netty.loadbalance.ServerRandomLoadBalance org.apache.seata.core.rpc.netty.loadbalance.ServerRoundRobinLoadBalance org.apache.seata.core.rpc.netty.loadbalance.ServerLeastActiveLoadBalance -org.apache.seata.core.rpc.netty.loadbalance.ServerConsistentHashLoadBalance diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java index a4b6d01e926..10004983aaf 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceBehaviorTest.java @@ -22,15 +22,8 @@ import org.junit.jupiter.api.Test; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -41,45 +34,87 @@ public class ServerLoadBalanceBehaviorTest { @Test - public void testConsistentHashFallbackKeyShouldBeOrderStable() { + public void testRandomLoadBalanceShouldReturnValidCandidate() { RpcContext c1 = buildRpcContext("app:10.10.10.1:8091", "10.10.10.1", 8091); RpcContext c2 = buildRpcContext("app:10.10.10.2:8091", "10.10.10.2", 8091); - List firstOrder = Arrays.asList(c1, c2); - List secondOrder = Arrays.asList(c2, c1); + List candidates = Arrays.asList(c1, c2); - ServerConsistentHashLoadBalance lb1 = new ServerConsistentHashLoadBalance(); - ServerConsistentHashLoadBalance lb2 = new ServerConsistentHashLoadBalance(); + ServerRandomLoadBalance lb = new ServerRandomLoadBalance(); + for (int i = 0; i < 100; i++) { + RpcContext selected = lb.select(candidates); + Assertions.assertNotNull(selected); + Assertions.assertTrue(candidates.contains(selected)); + } + } - RpcContext result1 = lb1.select(firstOrder, null); - RpcContext result2 = lb2.select(secondOrder, null); + @Test + public void testRoundRobinLoadBalanceShouldDistributeEvenly() { + RpcContext c1 = buildRpcContext("app:10.10.10.1:8091", "10.10.10.1", 8091); + RpcContext c2 = buildRpcContext("app:10.10.10.2:8091", "10.10.10.2", 8091); + List candidates = Arrays.asList(c1, c2); - Assertions.assertEquals(result1.getClientId(), result2.getClientId()); + ServerRoundRobinLoadBalance lb = new ServerRoundRobinLoadBalance(); + int c1Count = 0; + int c2Count = 0; + for (int i = 0; i < 100; i++) { + RpcContext selected = lb.select(candidates); + if (selected == c1) { + c1Count++; + } else { + c2Count++; + } + } + // RoundRobin should distribute evenly + Assertions.assertEquals(50, c1Count); + Assertions.assertEquals(50, c2Count); } @Test - public void testConsistentHashShouldBeThreadSafeForConcurrentSelect() throws ExecutionException, InterruptedException { + public void testLeastActiveLoadBalanceShouldSelectLeastActive() { RpcContext c1 = buildRpcContext("app:10.10.10.1:8091", "10.10.10.1", 8091); RpcContext c2 = buildRpcContext("app:10.10.10.2:8091", "10.10.10.2", 8091); - List candidates = Collections.unmodifiableList(Arrays.asList(c1, c2)); - - ServerConsistentHashLoadBalance loadBalance = new ServerConsistentHashLoadBalance(); - ExecutorService executor = Executors.newFixedThreadPool(8); - try { - List> futures = new ArrayList<>(32); - for (int i = 0; i < 32; i++) { - futures.add(CompletableFuture.supplyAsync( - () -> loadBalance.select(candidates, "xid-test-1").getClientId(), executor)); - } - String expected = futures.get(0).get(); - for (CompletableFuture future : futures) { - Assertions.assertEquals(expected, future.get()); - } - } finally { - executor.shutdownNow(); - executor.awaitTermination(1, TimeUnit.SECONDS); + // c1 has lower active count + c1.incrementActiveCount(); + c2.incrementActiveCount(); + c2.incrementActiveCount(); + c2.incrementActiveCount(); + + List candidates = Arrays.asList(c1, c2); + ServerLeastActiveLoadBalance lb = new ServerLeastActiveLoadBalance(); + RpcContext selected = lb.select(candidates); + Assertions.assertEquals(c1, selected); + } + + @Test + public void testLeastActiveLoadBalanceWithSameActiveShouldRandomSelect() { + RpcContext c1 = buildRpcContext("app:10.10.10.1:8091", "10.10.10.1", 8091); + RpcContext c2 = buildRpcContext("app:10.10.10.2:8091", "10.10.10.2", 8091); + List candidates = Arrays.asList(c1, c2); + + ServerLeastActiveLoadBalance lb = new ServerLeastActiveLoadBalance(); + // Both have same active count (0), should randomly select + for (int i = 0; i < 100; i++) { + RpcContext selected = lb.select(candidates); + Assertions.assertNotNull(selected); + Assertions.assertTrue(candidates.contains(selected)); } } + @Test + public void testSingleCandidateShouldReturnItDirectly() { + RpcContext c1 = buildRpcContext("app:10.10.10.1:8091", "10.10.10.1", 8091); + List candidates = Arrays.asList(c1); + + ServerRandomLoadBalance randomLb = new ServerRandomLoadBalance(); + Assertions.assertEquals(c1, randomLb.select(candidates)); + + ServerRoundRobinLoadBalance roundRobinLb = new ServerRoundRobinLoadBalance(); + Assertions.assertEquals(c1, roundRobinLb.select(candidates)); + + ServerLeastActiveLoadBalance leastActiveLb = new ServerLeastActiveLoadBalance(); + Assertions.assertEquals(c1, leastActiveLb.select(candidates)); + } + private RpcContext buildRpcContext(String clientId, String ip, int port) { Channel channel = mock(Channel.class); when(channel.remoteAddress()).thenReturn(new InetSocketAddress(ip, port)); diff --git a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java index ab3dde99f69..9c6018423ec 100644 --- a/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java +++ b/core/src/test/java/org/apache/seata/core/rpc/netty/loadbalance/ServerLoadBalanceFactoryTest.java @@ -16,8 +16,8 @@ */ package org.apache.seata.core.rpc.netty.loadbalance; +import org.apache.seata.common.loader.EnhancedServiceLoader; import org.apache.seata.core.model.BranchType; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -26,51 +26,65 @@ */ public class ServerLoadBalanceFactoryTest { - @AfterEach - public void cleanUp() { - System.clearProperty("seata.server.loadBalance.at.enabled"); - System.clearProperty("seata.server.loadBalance.at.type"); - System.clearProperty("seata.server.loadBalance.tcc.enabled"); - System.clearProperty("seata.server.loadBalance.tcc.type"); + @Test + public void testXaReturnsNull() { + // XA does not support server-side load balancing + ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.XA); + Assertions.assertNull(loadBalance); + } + + @Test + public void testSagaReturnsNull() { + // SAGA does not support server-side load balancing + ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.SAGA); + Assertions.assertNull(loadBalance); } @Test - public void testAtDefaultEnabledAndUseRoundRobinType() { + public void testAtNotConfiguredReturnsNull() { + // When no type is configured, should return null (use original logic) ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.AT); - Assertions.assertNotNull(loadBalance); - Assertions.assertTrue(ServerLoadBalanceFactory.isEnabled(BranchType.AT)); - Assertions.assertTrue(loadBalance instanceof ServerRoundRobinLoadBalance); + Assertions.assertNull(loadBalance); } @Test - public void testTccNeedExplicitTypeWhenEnabled() { - System.setProperty("seata.server.loadBalance.tcc.enabled", "true"); + public void testTccNotConfiguredReturnsNull() { ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.TCC); Assertions.assertNull(loadBalance); } @Test - public void testTccEnabledWithType() { - System.setProperty("seata.server.loadBalance.tcc.enabled", "true"); - System.setProperty("seata.server.loadBalance.tcc.type", "RandomLoadBalance"); - ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.TCC); + public void testSpiLoadRandomLoadBalance() { + ServerLoadBalance loadBalance = EnhancedServiceLoader.load(ServerLoadBalance.class, "RandomLoadBalance"); Assertions.assertNotNull(loadBalance); Assertions.assertTrue(loadBalance instanceof ServerRandomLoadBalance); } @Test - public void testAtConfiguredXidShouldFallbackToRoundRobin() { - System.setProperty("seata.server.loadBalance.at.type", "XID"); - ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.AT); + public void testSpiLoadRoundRobinLoadBalance() { + ServerLoadBalance loadBalance = EnhancedServiceLoader.load(ServerLoadBalance.class, "RoundRobinLoadBalance"); Assertions.assertNotNull(loadBalance); Assertions.assertTrue(loadBalance instanceof ServerRoundRobinLoadBalance); } @Test - public void testTccConfiguredXidShouldReturnNull() { - System.setProperty("seata.server.loadBalance.tcc.enabled", "true"); - System.setProperty("seata.server.loadBalance.tcc.type", "XID"); - ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(BranchType.TCC); - Assertions.assertNull(loadBalance); + public void testSpiLoadLeastActiveLoadBalance() { + ServerLoadBalance loadBalance = EnhancedServiceLoader.load(ServerLoadBalance.class, "LeastActiveLoadBalance"); + Assertions.assertNotNull(loadBalance); + Assertions.assertTrue(loadBalance instanceof ServerLeastActiveLoadBalance); + } + + @Test + public void testSpiLoadInvalidTypeThrowsException() { + // Loading a non-existent LB type should throw EnhancedServiceNotFoundException + Assertions.assertThrows( + Exception.class, () -> EnhancedServiceLoader.load(ServerLoadBalance.class, "NonExistentLoadBalance")); + } + + @Test + public void testFactoryConfigKeyFormat() { + // Verify the config key format matches the expected pattern + Assertions.assertEquals("server.loadBalance.at.type", ServerLoadBalanceFactory.SERVER_LB_AT_TYPE); + Assertions.assertEquals("server.loadBalance.tcc.type", ServerLoadBalanceFactory.SERVER_LB_TCC_TYPE); } } From 79efcdde31485859fc3096baf03eaedc0ad3b58c Mon Sep 17 00:00:00 2001 From: xiaoyu Date: Fri, 15 May 2026 18:28:21 +0800 Subject: [PATCH 4/5] feature: reverse load balancing --- .../java/org/apache/seata/core/rpc/netty/ChannelManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java index d02f49f3f24..8d236b75e99 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java @@ -372,7 +372,7 @@ public static Channel getChannel(String resourceId, String clientId, boolean try /** * Gets channel with transaction context for server-side load balancing. * - *

Only AT and TCC branch types support server-side load balancing. + * Only AT and TCC branch types support server-side load balancing. * XA is excluded because its second-phase operations are bound to the local database connection * of the original RM. SAGA is excluded because its state machine execution context is held * in memory with no distributed lock protection. For XA/SAGA and unconfigured AT/TCC, From 0c1dc6f1fe8c68fb0a5c8d895bd7e1a13c49cc3a Mon Sep 17 00:00:00 2001 From: xiaoyu Date: Fri, 15 May 2026 22:34:02 +0800 Subject: [PATCH 5/5] feature: optimize code format --- .../rpc/netty/AbstractNettyRemotingServer.java | 2 +- .../seata/core/rpc/netty/ChannelManager.java | 6 ++---- ....core.rpc.netty.loadbalance.ServerLoadBalance | 16 ++++++++++++++++ 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 57f5f49775c..9a7922cb3f8 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -83,7 +83,7 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo public Object sendSyncRequest( String resourceId, String clientId, Object msg, boolean tryOtherApp, String xid, BranchType branchType) throws TimeoutException, IOException { - Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp, xid, branchType); + Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp, branchType); if (channel == null) { throw new IOException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java index 8d236b75e99..f1b0884eb97 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java @@ -366,7 +366,7 @@ private static Channel getChannelFromSameClientMap(Map clie * @return Corresponding channel, NULL if not found. */ public static Channel getChannel(String resourceId, String clientId, boolean tryOtherApp) { - return getChannel(resourceId, clientId, tryOtherApp, null, null); + return getChannel(resourceId, clientId, tryOtherApp, null); } /** @@ -381,12 +381,10 @@ public static Channel getChannel(String resourceId, String clientId, boolean try * @param resourceId Resource ID * @param clientId Client ID - ApplicationId:IP:Port * @param tryOtherApp try other app - * @param xid global transaction xid, used for load balancing affinity * @param branchType branch type, determines whether and which LB algorithm to use * @return Corresponding channel, NULL if not found. */ - public static Channel getChannel( - String resourceId, String clientId, boolean tryOtherApp, String xid, BranchType branchType) { + public static Channel getChannel(String resourceId, String clientId, boolean tryOtherApp, BranchType branchType) { // XA and SAGA do not support server-side load balancing, use original priority-based logic if (branchType != BranchType.AT && branchType != BranchType.TCC) { return getChannelByPriority(resourceId, clientId, tryOtherApp); diff --git a/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance b/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance index e53955e3159..cfc12e0da9f 100644 --- a/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance +++ b/core/src/main/resources/META-INF/services/org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance @@ -1,3 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# org.apache.seata.core.rpc.netty.loadbalance.ServerRandomLoadBalance org.apache.seata.core.rpc.netty.loadbalance.ServerRoundRobinLoadBalance org.apache.seata.core.rpc.netty.loadbalance.ServerLeastActiveLoadBalance