Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.env.Environment;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -89,7 +88,6 @@
import java.util.stream.Collectors;

import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_APPLICATION_CONTEXT;
import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_BRANCH_SESSION;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_GLOBAL_SESSION;
import static org.apache.seata.server.cluster.raft.sync.msg.RaftSyncMsgType.ADD_VGROUP_MAPPING;
Expand Down Expand Up @@ -387,9 +385,7 @@ public RaftClusterMetadata changeOrInitRaftClusterMetadata() {
cureentPeerId.getIp(),
XID.getPort(),
raftServer.getServerId().getPort(),
Integer.parseInt(
((Environment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT))
.getProperty("server.port", String.valueOf(7091))),
XID.getPort(),
group,
Collections.emptyMap());
leader.setRole(ClusterRole.LEADER);
Expand Down Expand Up @@ -434,9 +430,7 @@ private void syncCurrentNodeInfo(PeerId leaderPeerId) {
cureentPeerId.getIp(),
XID.getPort(),
cureentPeerId.getPort(),
Integer.parseInt(((Environment)
ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT))
.getProperty("server.port", String.valueOf(7091))),
XID.getPort(),
group,
Collections.emptyMap());
InvokeContext invokeContext = new InvokeContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Instance serverInstanceInit() {
instance.setTerm(System.currentTimeMillis());

// load node Endpoint
instance.setControl(new Node.Endpoint(XID.getIpAddress(), serverProperties.getPort(), "http"));
instance.setControl(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "http"));

// load metadata
for (PropertySource<?> propertySource : environment.getPropertySources()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Instance serverInstanceInit() {
instance.setTerm(term);
instance.setRole(stateMachine.isLeader() ? ClusterRole.LEADER : ClusterRole.FOLLOWER);
// load node Endpoint
instance.setControl(new Node.Endpoint(XID.getIpAddress(), serverProperties.getPort(), "http"));
instance.setControl(new Node.Endpoint(XID.getIpAddress(), XID.getPort(), "http"));

PeerId peerId =
RaftServerManager.getRaftServer(raftProperties.getGroup()).getServerId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import org.apache.seata.common.XID;
import org.apache.seata.common.holder.ObjectHolder;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Node;
import org.apache.seata.server.BaseSpringBootTest;
import org.apache.seata.server.cluster.raft.execute.RaftMsgExecute;
import org.apache.seata.server.cluster.raft.processor.request.PutNodeMetadataRequest;
import org.apache.seata.server.cluster.raft.snapshot.StoreSnapshotFile;
import org.apache.seata.server.cluster.raft.snapshot.metadata.LeaderMetadataSnapshotFile;
import org.apache.seata.server.cluster.raft.sync.RaftSyncMessageSerializer;
Expand All @@ -40,14 +46,22 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.StandardEnvironment;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -56,8 +70,10 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -69,16 +85,20 @@ public class RaftStateMachineTest extends BaseSpringBootTest {

private RaftStateMachine raftStateMachine;
private static final String TEST_GROUP = "test-group";
private Object previousEnvironment;

@BeforeEach
public void setUp() {
StoreConfig.setStartupParameter("file", "file", "file");
previousEnvironment = ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);
raftStateMachine = new RaftStateMachine(TEST_GROUP);
}

@AfterEach
public void tearDown() {
StoreConfig.setStartupParameter("file", "file", "file");
RouteTable.getInstance().removeGroup(TEST_GROUP);
restoreEnvironment(previousEnvironment);
}

@Test
Expand Down Expand Up @@ -188,6 +208,88 @@ public void testGetAndSetRaftLeaderMetadata() {
assertEquals(100L, retrieved.getTerm());
}

@Test
public void testChangeOrInitRaftClusterMetadataUsesServicePortForControlEndpoint() {
bindEnvironmentWithServerPort(8088);
raftStateMachine.onLeaderStart(7L);

RaftServer raftServer = mock(RaftServer.class);
when(raftServer.getServerId()).thenReturn(new PeerId("10.0.0.1", 9091));

try (MockedStatic<RaftServerManager> raftServerManagerMock = Mockito.mockStatic(RaftServerManager.class);
MockedStatic<XID> xidMock = Mockito.mockStatic(XID.class)) {
raftServerManagerMock
.when(() -> RaftServerManager.getRaftServer(TEST_GROUP))
.thenReturn(raftServer);
xidMock.when(XID::getPort).thenReturn(7091);

RaftClusterMetadata metadata = raftStateMachine.changeOrInitRaftClusterMetadata();

assertNotNull(metadata.getLeader());
assertEquals(7091, metadata.getLeader().getTransaction().getPort());
assertEquals(7091, metadata.getLeader().getControl().getPort());
assertEquals(9091, metadata.getLeader().getInternal().getPort());
}
}

@Test
public void testSyncCurrentNodeInfoUsesServicePortForControlEndpoint() throws Exception {
bindEnvironmentWithServerPort(8088);

PeerId currentPeerId = new PeerId("10.0.0.2", 9092);
PeerId leaderPeerId = new PeerId("10.0.0.1", 9091);
Configuration configuration = new Configuration();
configuration.addPeer(currentPeerId);
RouteTable.getInstance().updateConfiguration(TEST_GROUP, configuration);

RaftClusterMetadata metadata = new RaftClusterMetadata(1L);
Node leader = new Node();
leader.setVersion("2.1.0");
metadata.setLeader(leader);
raftStateMachine.setRaftLeaderMetadata(metadata);

RaftServer raftServer = mock(RaftServer.class);
when(raftServer.getServerId()).thenReturn(currentPeerId);

CliClientServiceImpl cliClientService = mock(CliClientServiceImpl.class);
RpcClient rpcClient = mock(RpcClient.class);
when(cliClientService.getRpcClient()).thenReturn(rpcClient);

AtomicReference<PutNodeMetadataRequest> capturedRequest = new AtomicReference<>();
doAnswer(invocation -> {
capturedRequest.set(invocation.getArgument(1));
return null;
})
.when(rpcClient)
.invokeAsync(any(), any(), any(), any(), anyLong());

java.lang.reflect.Method method = RaftStateMachine.class.getDeclaredMethod("syncCurrentNodeInfo", PeerId.class);
method.setAccessible(true);

try (MockedStatic<RaftServerManager> raftServerManagerMock = Mockito.mockStatic(RaftServerManager.class);
MockedStatic<XID> xidMock = Mockito.mockStatic(XID.class)) {
raftServerManagerMock
.when(() -> RaftServerManager.getRaftServer(TEST_GROUP))
.thenReturn(raftServer);
raftServerManagerMock
.when(RaftServerManager::getCliClientServiceInstance)
.thenReturn(cliClientService);
xidMock.when(XID::getPort).thenReturn(7091);

assertDoesNotThrow(() -> {
try {
method.invoke(raftStateMachine, leaderPeerId);
} catch (java.lang.reflect.InvocationTargetException e) {
throw new RuntimeException(e.getCause());
}
});
}

assertNotNull(capturedRequest.get());
assertEquals(7091, capturedRequest.get().getNode().getTransaction().getPort());
assertEquals(7091, capturedRequest.get().getNode().getControl().getPort());
}

@Test
public void testMultipleLeaderStarts() {
for (int i = 1; i <= 5; i++) {
Expand Down Expand Up @@ -892,4 +994,31 @@ public void testChangeNodeMetadataUpdatesExistingLearner() {
assertEquals("2.0.0", resultNode.getVersion());
assertEquals(1, updatedMetadata.getLearner().size()); // Should still be 1, not 2
}

private void bindEnvironmentWithServerPort(int serverPort) {
Map<String, Object> properties = new HashMap<>();
properties.put("server.port", serverPort);
ConfigurableEnvironment environment = new StandardEnvironment();
environment.getPropertySources().addFirst(new MapPropertySource("testServerPort", properties));
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT, environment);
}

private void restoreEnvironment(Object environment) {
if (environment != null) {
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT, environment);
return;
}
removeObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);
}

@SuppressWarnings("unchecked")
private void removeObject(String objectKey) {
try {
Field objectMapField = ObjectHolder.class.getDeclaredField("OBJECT_MAP");
objectMapField.setAccessible(true);
((Map<String, Object>) objectMapField.get(null)).remove(objectKey);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
Comment thread
skt-shinyruo marked this conversation as resolved.
}
Loading
Loading