Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
64fc039
feat(config): add rocksdb file engine option
LegendPei Jun 3, 2026
085bbba
feat(lock): add rocksdb lock mode routing
LegendPei Jun 3, 2026
f0e93b8
feat(session): fail fast for rocksdb file engine
LegendPei Jun 3, 2026
6acd51f
test(rocksdb): cover config and spi routing
LegendPei Jun 3, 2026
7570d9b
fix(rocksdb): fail fast loader lock mode errors
LegendPei Jun 3, 2026
a906db9
Merge branch 'apache:2.x' into feat/rocksdb-phase1-config-and-spi
LegendPei Jun 3, 2026
d8e2794
Merge pull request #1 from LegendPei/feat/rocksdb-phase1-config-and-spi
LegendPei Jun 3, 2026
b74a5a7
feat(rocksdb): add store engine foundation
LegendPei Jun 4, 2026
f1e9236
feat(rocksdb): add session manager integration
LegendPei Jun 4, 2026
4ebc650
feat(rocksdb): add lock manager integration
LegendPei Jun 4, 2026
adf1d2e
feat(rocksdb): migrate file sessions to rocksdb
LegendPei Jun 4, 2026
eb46759
fix(rocksdb): harden phase2 recovery paths
LegendPei Jun 4, 2026
4d8126f
fix(rocksdb): harden recovery session handling
LegendPei Jun 4, 2026
75530b7
fix(rocksdb): address engine factory review comments
LegendPei Jun 4, 2026
076d4a9
fix(rocksdb): stream migration cleanup deletes
LegendPei Jun 4, 2026
c5e8580
Merge pull request #2 from LegendPei/feat/rocksdb-phase2-engine
LegendPei Jun 4, 2026
a0e45ac
feat(rocksdb): add query indexes for file mode
LegendPei Jun 5, 2026
003e41a
feat(rocksdb): cover console indexed queries
LegendPei Jun 5, 2026
a52d16f
perf(rocksdb): stream indexed status queries
LegendPei Jun 5, 2026
8446c78
test(rocksdb): add query index benchmark harness
LegendPei Jun 5, 2026
98e5170
fix(rocksdb): preserve console xid query semantics
LegendPei Jun 6, 2026
79b8b6d
fix(rocksdb): ignore non-positive console transaction ids
LegendPei Jun 6, 2026
8f0d94e
Merge pull request #3 from LegendPei/feat/rocksdb-phase3-query-index
LegendPei Jun 6, 2026
b90c4c1
Merge branch 'apache:2.x' into feat/support-rocksdb-in-filemode
LegendPei Jun 6, 2026
cf3aaf2
Merge branch 'apache:2.x' into feat/support-rocksdb-in-filemode
LegendPei Jun 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ public interface ConfigurationKeys {
*/
String STORE_FILE_DIR = STORE_FILE_PREFIX + "dir";

/**
* The constant STORE_FILE_ENGINE
*/
String STORE_FILE_ENGINE = STORE_FILE_PREFIX + "engine";

/**
* The constant STORE_FILE_ROCKSDB_PREFIX
*/
String STORE_FILE_ROCKSDB_PREFIX = STORE_FILE_PREFIX + "rocksdb.";

/**
* The constant STORE_FILE_ROCKSDB_DIR
*/
String STORE_FILE_ROCKSDB_DIR = STORE_FILE_ROCKSDB_PREFIX + "dir";

/**
* The constant SERVICE_GROUP_MAPPING_PREFIX.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ public enum LockMode {
/**
* raft store
*/
RAFT("raft");
RAFT("raft"),
/**
* RocksDB lock mode for file store engine.
*/
ROCKSDB("rocksdb");

private String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class GlobalSessionParam extends BaseParam implements Serializable {
* the xid
*/
private String xid;
/**
* the transaction id
*/
private Long transactionId;
/**
* the application id
*/
Expand Down Expand Up @@ -63,6 +67,14 @@ public void setXid(String xid) {
this.xid = xid;
}

public Long getTransactionId() {
return transactionId;
}

public void setTransactionId(Long transactionId) {
this.transactionId = transactionId;
}

public String getTransactionName() {
return transactionName;
}
Expand Down Expand Up @@ -105,9 +117,13 @@ public void setVgroup(String vgroup) {

@Override
public String toString() {
return "GlobalSessionParam{" + "xid='" + xid + '\'' + ", applicationId='" + applicationId + '\'' + ", status="
+ status + ", transactionName='" + transactionName + '\'' + ", vgroup='" + vgroup + '\''
+ ", withBranch="
return "GlobalSessionParam{" + "xid='"
+ xid + '\'' + ", transactionId="
+ transactionId + ", applicationId='"
+ applicationId + '\'' + ", status="
+ status + ", transactionName='"
+ transactionName + '\'' + ", vgroup='"
+ vgroup + '\'' + ", withBranch="
+ withBranch + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void testGetName() {
Assertions.assertEquals("db", LockMode.DB.getName());
Assertions.assertEquals("redis", LockMode.REDIS.getName());
Assertions.assertEquals("raft", LockMode.RAFT.getName());
Assertions.assertEquals("rocksdb", LockMode.ROCKSDB.getName());
}

@Test
Expand All @@ -40,6 +41,8 @@ void testGet() {
Assertions.assertEquals(LockMode.REDIS, LockMode.get("REDIS"));
Assertions.assertEquals(LockMode.RAFT, LockMode.get("raft"));
Assertions.assertEquals(LockMode.RAFT, LockMode.get("Raft"));
Assertions.assertEquals(LockMode.ROCKSDB, LockMode.get("rocksdb"));
Assertions.assertEquals(LockMode.ROCKSDB, LockMode.get("RocksDB"));
}

@Test
Expand All @@ -57,6 +60,7 @@ void testContainsValid() {
Assertions.assertTrue(LockMode.contains("db"));
Assertions.assertTrue(LockMode.contains("redis"));
Assertions.assertTrue(LockMode.contains("raft"));
Assertions.assertTrue(LockMode.contains("rocksdb"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class McpGlobalSessionParamDto implements Serializable {
@McpToolParam(description = "Global transaction ID", required = false)
private String xid;

@McpToolParam(description = "Global transaction numeric ID", required = false)
private Long transactionId;

@McpToolParam(description = "applicationId", required = false)
private String applicationId;

Expand Down Expand Up @@ -95,6 +98,14 @@ public void setXid(String xid) {
this.xid = xid;
}

public Long getTransactionId() {
return transactionId;
}

public void setTransactionId(Long transactionId) {
this.transactionId = transactionId;
}

public String getTransactionName() {
return transactionName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public static McpGlobalSessionParam convertFromDtoParam(McpGlobalSessionParamDto
param.setPageNum(paramDto.getPageNum());
param.setStatus(paramDto.getStatus());
param.setXid(paramDto.getXid());
param.setTransactionId(paramDto.getTransactionId());
param.setApplicationId(paramDto.getApplicationId());
param.setTransactionName(paramDto.getTransactionName());
param.setWithBranch(paramDto.isWithBranch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,13 @@ class TransactionInfo extends React.Component<GlobalProps, TransactionInfoState>
}

searchFilterOnChange = (key: string, val: string) => {
if (key === 'transactionId') {
const transactionId = val && /^(?!0+$)\d+$/.test(val) ? val : undefined;
this.setState(prevState => ({
globalSessionParam: { ...prevState.globalSessionParam, transactionId },
}));
return;
}
if (key === 'namespace') {
const selectedNamespace = this.state.namespaceOptions.get(val);
const clusters = selectedNamespace ? selectedNamespace.clusters : [];
Expand Down Expand Up @@ -1094,6 +1101,12 @@ class TransactionInfo extends React.Component<GlobalProps, TransactionInfoState>
onChange={(value: string) => { this.searchFilterOnChange('xid', value); }}
/>
</FormItem>
<FormItem name="transactionId" label="transactionId">
<Input
placeholder={inputFilterPlaceholder}
onChange={(value: string) => { this.searchFilterOnChange('transactionId', value); }}
/>
</FormItem>
<FormItem name="applicationId" label="applicationId">
<Input
placeholder={inputFilterPlaceholder}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import qs from 'qs';

export type GlobalSessionParam = {
xid?: string,
transactionId?: string,
applicationId?: string,
status?: number,
transactionName?: string,
Expand Down
6 changes: 6 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
<janino-version>3.1.10</janino-version>
<mockwebserver-version>4.12.0</mockwebserver-version>
<native-lib-loader.version>2.4.0</native-lib-loader.version>
<rocksdbjni.version>10.10.1.1</rocksdbjni.version>

<!-- for fory -->
<fory.version>0.12.3</fory.version>
Expand Down Expand Up @@ -604,6 +605,11 @@
<artifactId>native-lib-loader</artifactId>
<version>${native-lib-loader.version}</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdbjni.version}</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@
<artifactId>seata-metrics-all</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
<!--

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ private String getWhereConditionByParam(GlobalSessionParam param, List<Object> s
whereConditionBuilder.append(" and xid = ? ");
sqlParamList.add(param.getXid());
}
if (param.getTransactionId() != null && param.getTransactionId() > 0) {
whereConditionBuilder.append(" and transaction_id = ? ");
sqlParamList.add(param.getTransactionId());
}
if (StringUtils.isNotBlank(param.getApplicationId())) {
whereConditionBuilder.append(" and application_id = ? ");
sqlParamList.add(param.getApplicationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@
package org.apache.seata.server.console.impl.file;

import org.apache.seata.common.result.PageResult;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.server.console.entity.param.GlobalSessionParam;
import org.apache.seata.server.console.entity.vo.GlobalSessionVO;
import org.apache.seata.server.console.impl.AbstractGlobalService;
import org.apache.seata.server.console.service.GlobalSessionService;
import org.apache.seata.server.session.GlobalSession;
import org.apache.seata.server.session.SessionCondition;
import org.apache.seata.server.session.SessionHolder;
import org.apache.seata.server.session.SessionManager;
import org.apache.seata.server.storage.SessionConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.Objects.isNull;
import static org.apache.seata.common.util.StringUtils.isBlank;
import static org.apache.seata.common.util.StringUtils.isNotBlank;

/**
* Global Session File ServiceImpl
Expand All @@ -51,8 +56,7 @@ public PageResult<GlobalSessionVO> query(GlobalSessionParam param) {
throw new IllegalArgumentException("wrong pageSize or pageNum");
}

final Collection<GlobalSession> allSessions =
SessionHolder.getRootSessionManager().allSessions();
final Collection<GlobalSession> allSessions = findCandidateSessions(param);

final List<GlobalSession> filteredSessions =
allSessions.parallelStream().filter(obtainPredicate(param)).collect(Collectors.toList());
Expand All @@ -61,6 +65,56 @@ public PageResult<GlobalSessionVO> query(GlobalSessionParam param) {
SessionConverter.convertGlobalSession(filteredSessions), param.getPageNum(), param.getPageSize());
}

private Collection<GlobalSession> findCandidateSessions(GlobalSessionParam param) {
SessionManager sessionManager = SessionHolder.getRootSessionManager();
if (isNotBlank(param.getXid()) && isCompleteXid(param.getXid())) {
GlobalSession globalSession = sessionManager.findGlobalSession(param.getXid(), param.isWithBranch());
return globalSession == null ? Collections.emptyList() : Collections.singletonList(globalSession);
}
if (param.getTransactionId() != null && param.getTransactionId() > 0) {
SessionCondition sessionCondition = new SessionCondition();
sessionCondition.setTransactionId(param.getTransactionId());
sessionCondition.setLazyLoadBranch(!param.isWithBranch());
return sessionManager.findGlobalSessions(sessionCondition);
}
if (param.getStatus() != null) {
GlobalStatus globalStatus = getGlobalStatus(param.getStatus());
if (globalStatus == null) {
return Collections.emptyList();
}
SessionCondition sessionCondition = new SessionCondition(globalStatus);
sessionCondition.setLazyLoadBranch(!param.isWithBranch());
return sessionManager.findGlobalSessions(sessionCondition);
}
return sessionManager.allSessions();
}

private GlobalStatus getGlobalStatus(int status) {
try {
return GlobalStatus.get(status);
} catch (IllegalArgumentException e) {
return null;
}
}

private boolean isCompleteXid(String xid) {
int lastSplitIndex = xid.lastIndexOf(':');
if (lastSplitIndex <= 0 || lastSplitIndex == xid.length() - 1) {
return false;
}
int portSplitIndex = xid.lastIndexOf(':', lastSplitIndex - 1);
if (portSplitIndex <= 0 || portSplitIndex == lastSplitIndex - 1) {
return false;
}
try {
Integer.parseInt(xid.substring(portSplitIndex + 1, lastSplitIndex));
Long.parseLong(xid.substring(lastSplitIndex + 1));
return true;
} catch (NumberFormatException e) {
return false;
}
}

/**
* obtain the condition
*
Expand All @@ -71,9 +125,14 @@ private Predicate<? super GlobalSession> obtainPredicate(GlobalSessionParam para

return session -> {
return
// xid
// xid
(isBlank(param.getXid()) || session.getXid().contains(param.getXid()))
&&
// transactionId
(param.getTransactionId() == null
|| param.getTransactionId() <= 0
|| Objects.equals(session.getTransactionId(), param.getTransactionId()))
&&
// applicationId
(isBlank(param.getApplicationId())
|| session.getApplicationId().contains(param.getApplicationId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public PageResult<GlobalSessionVO> query(GlobalSessionParam param) {

RedisTransactionStoreManager instance = RedisTransactionStoreManagerFactory.getInstance();

if (isBlank(param.getXid()) && param.getStatus() == null) {
boolean hasDirectGlobalQuery = isNotBlank(param.getXid())
|| (param.getTransactionId() != null && param.getTransactionId() > 0);
if (!hasDirectGlobalQuery && param.getStatus() == null) {
total = instance.countByGlobalSessions(GlobalStatus.values());
globalSessions =
instance.findGlobalSessionByPage(param.getPageNum(), param.getPageSize(), param.isWithBranch());
Expand All @@ -79,10 +81,16 @@ public PageResult<GlobalSessionVO> query(GlobalSessionParam param) {
sessionCondition.setLazyLoadBranch(!param.isWithBranch());
globalSessions = instance.readSession(sessionCondition);
total = (long) globalSessions.size();
} else if (param.getTransactionId() != null && param.getTransactionId() > 0) {
SessionCondition sessionCondition = new SessionCondition();
sessionCondition.setTransactionId(param.getTransactionId());
sessionCondition.setLazyLoadBranch(!param.isWithBranch());
globalSessions = instance.readSession(sessionCondition);
total = (long) globalSessions.size();
}

if (param.getStatus() != null && GlobalStatus.get(param.getStatus()) != null) {
if (CollectionUtils.isNotEmpty(globalSessions)) {
if (hasDirectGlobalQuery || CollectionUtils.isNotEmpty(globalSessions)) {
globalSessionsNew = globalSessions.stream()
.filter(globalSession -> globalSession.getStatus().getCode() == (param.getStatus()))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.store.LockMode;
import org.apache.seata.common.store.StoreMode;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.server.store.StoreConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,7 +29,6 @@
public class LockerManagerFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(LockerManagerFactory.class);
private static final Configuration CONFIG = ConfigurationFactory.getInstance();

/**
* the lock manager
Expand Down Expand Up @@ -62,13 +58,10 @@ public static void init(LockMode lockMode) {
synchronized (LockerManagerFactory.class) {
if (LOCK_MANAGER == null) {
if (null == lockMode) {
lockMode = StoreConfig.getLockMode();
lockMode = StoreConfig.getEffectiveLockMode();
}
LOGGER.info("use lock store mode: {}", lockMode.getName());
// if not exist the lock mode, throw exception
if (null != StoreMode.get(lockMode.name())) {
LOCK_MANAGER = EnhancedServiceLoader.load(LockManager.class, lockMode.getName());
}
LOCK_MANAGER = EnhancedServiceLoader.load(LockManager.class, lockMode.getName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ private int calGlobalSessionSize(

@Override
public void decode(byte[] a) {
this.branchSessions = new ArrayList<>();
this.branchSessions = isLazyLoadBranch() ? null : new ArrayList<>();
ByteBuffer byteBuffer = ByteBuffer.wrap(a);
this.transactionId = byteBuffer.getLong();
this.timeout = byteBuffer.getInt();
Expand Down
Loading
Loading