Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/src/main/java/org/apache/seata/core/rpc/RemotingServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.seata.core.rpc;

import io.netty.channel.Channel;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.protocol.MessageType;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.rpc.processor.RemotingProcessor;
Expand Down Expand Up @@ -45,6 +46,24 @@ public interface RemotingServer {
Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp)
throws TimeoutException, IOException;

/**
* server send sync request with transaction context.
*
* @param resourceId rm client resourceId
* @param clientId rm client id
* @param msg transaction message {@code org.apache.seata.core.protocol}
* @param tryOtherApp try other app
* @param xid global transaction xid
* @param branchType branch type
* @return client result message
* @throws TimeoutException TimeoutException
*/
default Object sendSyncRequest(
String resourceId, String clientId, Object msg, boolean tryOtherApp, String xid, BranchType branchType)
throws TimeoutException, IOException {
return sendSyncRequest(resourceId, clientId, msg, tryOtherApp);
}

/**
* server send sync request.
*
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/seata/core/rpc/RpcContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* The type rpc context.
Expand All @@ -53,6 +54,11 @@ public class RpcContext {

private Set<String> resourceSets;

/**
* current active sync requests on this context
*/
private final AtomicInteger activeCount = new AtomicInteger();

/**
* id
*/
Expand Down Expand Up @@ -337,6 +343,18 @@ public void setClientId(String clientId) {
this.clientId = clientId;
}

public int getActiveCount() {
return activeCount.get();
}

public void incrementActiveCount() {
activeCount.incrementAndGet();
}

public void decrementActiveCount() {
activeCount.updateAndGet(current -> current > 0 ? current - 1 : 0);
}

@Override
public String toString() {
return "RpcContext{" + "applicationId='"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.protocol.AbstractMessage;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.MergedWarpMessage;
Expand Down Expand Up @@ -78,6 +79,28 @@ public Object sendSyncRequest(String resourceId, String clientId, Object msg, bo
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
}

@Override
public Object sendSyncRequest(
String resourceId, String clientId, Object msg, boolean tryOtherApp, String xid, BranchType branchType)
throws TimeoutException, IOException {
Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp, xid, branchType);
if (channel == null) {
throw new IOException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
}
RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel);
if (rpcContext != null) {
rpcContext.incrementActiveCount();
}
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
try {
return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
} finally {
if (rpcContext != null) {
rpcContext.decrementActiveCount();
}
}
}

@Override
public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException, IOException {
if (channel == null) {
Expand Down
146 changes: 143 additions & 3 deletions core/src/main/java/org/apache/seata/core/rpc/netty/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.protocol.IncompatibleVersionException;
import org.apache.seata.core.protocol.RegisterRMRequest;
import org.apache.seata.core.protocol.RegisterTMRequest;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalance;
import org.apache.seata.core.rpc.netty.loadbalance.ServerLoadBalanceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -361,6 +366,113 @@ private static Channel getChannelFromSameClientMap(Map<Integer, RpcContext> clie
* @return Corresponding channel, NULL if not found.
*/
public static Channel getChannel(String resourceId, String clientId, boolean tryOtherApp) {
return getChannel(resourceId, clientId, tryOtherApp, null, null);
}

/**
* Gets channel with transaction context for server-side load balancing.
*
* Only AT and TCC branch types support server-side load balancing.
* XA is excluded because its second-phase operations are bound to the local database connection
* of the original RM. SAGA is excluded because its state machine execution context is held
* in memory with no distributed lock protection. For XA/SAGA and unconfigured AT/TCC,
* the original priority-based channel selection logic is used.
*
* @param resourceId Resource ID
* @param clientId Client ID - ApplicationId:IP:Port
* @param tryOtherApp try other app
* @param xid global transaction xid, used for load balancing affinity
* @param branchType branch type, determines whether and which LB algorithm to use
* @return Corresponding channel, NULL if not found.
Comment thread
YvCeung marked this conversation as resolved.
*/
public static Channel getChannel(
String resourceId, String clientId, boolean tryOtherApp, String xid, BranchType branchType) {
// XA and SAGA do not support server-side load balancing, use original priority-based logic
if (branchType != BranchType.AT && branchType != BranchType.TCC) {
return getChannelByPriority(resourceId, clientId, tryOtherApp);
}

// If no load balance algorithm is configured for this branch type, use original logic
ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(branchType);
if (loadBalance == null) {
return getChannelByPriority(resourceId, clientId, tryOtherApp);
}

String[] clientIdInfo = readClientId(clientId);
if (clientIdInfo == null || clientIdInfo.length != 3) {
throw new FrameworkException("Invalid Client ID: " + clientId);
}

if (StringUtils.isBlank(resourceId)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No channel is available, resourceId is null or empty");
}
return null;
}

String targetApplicationId = clientIdInfo[0];
ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap =
RM_CHANNELS.get(resourceId);

if (targetApplicationId == null || applicationIdMap == null || applicationIdMap.isEmpty()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No channel is available for resource[{}]", resourceId);
}
return null;
}

List<RpcContext> candidates = collectCandidates(applicationIdMap, targetApplicationId, tryOtherApp);
if (CollectionUtils.isEmpty(candidates)) {
return null;
}
if (candidates.size() == 1) {
return candidates.get(0).getChannel();
}
RpcContext selected = loadBalance.select(candidates);
return selected == null ? null : selected.getChannel();
}

private static List<RpcContext> collectCandidates(
ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMap,
String targetApplicationId,
boolean tryOtherApp) {
List<RpcContext> candidates = new ArrayList<>();
ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);
collectActiveContexts(ipMap, candidates);
if (tryOtherApp) {
for (ConcurrentMap.Entry<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> appEntry :
applicationIdMap.entrySet()) {
if (appEntry.getKey().equals(targetApplicationId)) {
continue;
}
collectActiveContexts(appEntry.getValue(), candidates);
}
}
return candidates;
}

private static void collectActiveContexts(
ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap, List<RpcContext> candidates) {
if (ipMap == null || ipMap.isEmpty()) {
return;
}
for (ConcurrentMap<Integer, RpcContext> portMap : ipMap.values()) {
if (portMap == null || portMap.isEmpty()) {
continue;
}
for (ConcurrentMap.Entry<Integer, RpcContext> entry : portMap.entrySet()) {
RpcContext rpcContext = entry.getValue();
Channel channel = rpcContext.getChannel();
if (channel.isActive()) {
candidates.add(rpcContext);
} else if (portMap.remove(entry.getKey(), rpcContext) && LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive {}", channel);
}
}
}
}

private static Channel getChannelByPriority(String resourceId, String clientId, boolean tryOtherApp) {
Channel resultChannel = null;

String[] clientIdInfo = readClientId(clientId);
Expand Down Expand Up @@ -556,16 +668,44 @@ private static Channel tryOtherApp(
* @return the rm channels,key:resourceId,value:channel
*/
public static Map<String, Channel> getRmChannels() {
return getRmChannels(null);
}

/**
* get rm channels by branch type.
*
* Only AT and TCC branch types support server-side load balancing.
* For other branch types or when no LB algorithm is configured, the original logic is used.
*
* @param branchType branch type, determines whether and which LB algorithm to use
* @return the rm channels,key:resourceId,value:channel
*/
public static Map<String, Channel> getRmChannels(BranchType branchType) {
if (RM_CHANNELS.isEmpty()) {
return Collections.emptyMap();
}
Map<String, Channel> channels = new HashMap<>(RM_CHANNELS.size());
ServerLoadBalance loadBalance = ServerLoadBalanceFactory.getInstance(branchType);
RM_CHANNELS.forEach((resourceId, value) -> {
Channel channel = tryOtherApp(value, null);
Channel channel = null;
if (loadBalance != null) {
List<RpcContext> candidates = new ArrayList<>();
for (ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap : value.values()) {
collectActiveContexts(ipMap, candidates);
}
if (CollectionUtils.isNotEmpty(candidates)) {
RpcContext selected = candidates.size() == 1 ? candidates.get(0) : loadBalance.select(candidates);
if (selected != null) {
channel = selected.getChannel();
}
}
}
if (channel == null) {
return;
channel = tryOtherApp(value, null);
}
if (channel != null) {
channels.put(resourceId, channel);
}
channels.put(resourceId, channel);
});
return channels;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty.loadbalance;

import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.core.rpc.RpcContext;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
* Least active load balance for server side.
*/
@LoadLevel(name = "LeastActiveLoadBalance")
public class ServerLeastActiveLoadBalance implements ServerLoadBalance {

@Override
public RpcContext select(List<RpcContext> candidates) {
long leastActive = -1;
int leastCount = 0;
int[] leastIndexes = new int[candidates.size()];
for (int i = 0; i < candidates.size(); i++) {
long active = candidates.get(i).getActiveCount();
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexes[0] = i;
} else if (active == leastActive) {
leastIndexes[leastCount++] = i;
}
}
if (leastCount == 1) {
return candidates.get(leastIndexes[0]);
}
return candidates.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty.loadbalance;

import org.apache.seata.core.rpc.RpcContext;

import java.util.List;

/**
* Server side load balance strategy.
*/
public interface ServerLoadBalance {

/**
* Select one candidate.
*
* @param candidates candidate rpc contexts
* @return selected context
*/
RpcContext select(List<RpcContext> candidates);
}
Loading
Loading