diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index e31bc8e72e2..325d8f9be7c 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -204,6 +204,11 @@ public interface ConfigurationKeys { */ String SERVICE_SESSION_RELOAD_READ_SIZE = STORE_FILE_PREFIX + "sessionReloadReadSize"; + /** + * The constant STORE_FILE_ENGINE. + */ + String STORE_FILE_ENGINE = STORE_FILE_PREFIX + "engine"; + /** * The constant CLIENT_REPORT_SUCCESS_ENABLE. */ diff --git a/dependencies/pom.xml b/dependencies/pom.xml index 20df1aa86df..96e28582ef6 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -74,6 +74,7 @@ 1.10.12 1.10.1 1.3.14 + 8.8.1 2.0 4.1.101.Final 4.0.3 @@ -790,6 +791,11 @@ jraft-core ${jraft.version} + + org.rocksdb + rocksdbjni + ${rocksdb.version} + com.github.luben zstd-jni diff --git a/server/pom.xml b/server/pom.xml index 9de1f407868..8f637258bda 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -282,6 +282,10 @@ + + org.rocksdb + rocksdbjni + com.alipay.sofa bolt diff --git a/server/src/main/java/org/apache/seata/server/storage/file/FileStoreEngine.java b/server/src/main/java/org/apache/seata/server/storage/file/FileStoreEngine.java new file mode 100644 index 00000000000..2bdc2c6539f --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/storage/file/FileStoreEngine.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.storage.file; + +/** + * The local file mode storage engine. + */ +public enum FileStoreEngine { + + /** + * Append-only data file engine. + */ + FILE("file"), + + /** + * RocksDB embedded key-value engine. + */ + ROCKSDB("rocksdb"); + + private final String name; + + FileStoreEngine(String name) { + this.name = name; + } + + public static FileStoreEngine get(String name) { + for (FileStoreEngine engine : FileStoreEngine.values()) { + if (engine.getName().equalsIgnoreCase(name)) { + return engine; + } + } + throw new IllegalArgumentException("unknown file store engine:" + name); + } + + public String getName() { + return name; + } +} diff --git a/server/src/main/java/org/apache/seata/server/storage/file/session/FileSessionManager.java b/server/src/main/java/org/apache/seata/server/storage/file/session/FileSessionManager.java index 0836432f048..1063d830ad4 100644 --- a/server/src/main/java/org/apache/seata/server/storage/file/session/FileSessionManager.java +++ b/server/src/main/java/org/apache/seata/server/storage/file/session/FileSessionManager.java @@ -30,11 +30,14 @@ import org.apache.seata.server.session.GlobalSession; import org.apache.seata.server.session.Reloadable; import org.apache.seata.server.session.SessionCondition; +import org.apache.seata.server.storage.file.FileStoreEngine; import org.apache.seata.server.storage.file.ReloadableStore; import org.apache.seata.server.storage.file.TransactionWriteStore; import org.apache.seata.server.storage.file.store.FileTransactionStoreManager; +import org.apache.seata.server.storage.file.store.RocksDBTransactionStoreManager; import org.apache.seata.server.store.AbstractTransactionStoreManager; import org.apache.seata.server.store.SessionStorable; +import org.apache.seata.server.store.StoreConfig; import org.apache.seata.server.store.TransactionStoreManager; import java.io.File; @@ -62,11 +65,28 @@ public class FileSessionManager extends AbstractSessionManager implements Reload private static final int READ_SIZE = ConfigurationFactory.getInstance() .getInt(ConfigurationKeys.SERVICE_SESSION_RELOAD_READ_SIZE, DEFAULT_SERVICE_SESSION_RELOAD_READ_SIZE); + private static final GlobalStatus[] ACTIVE_STATUSES = { + GlobalStatus.UnKnown, + GlobalStatus.Begin, + GlobalStatus.Committing, + GlobalStatus.CommitRetrying, + GlobalStatus.Rollbacking, + GlobalStatus.RollbackRetrying, + GlobalStatus.TimeoutRollbacking, + GlobalStatus.TimeoutRollbackRetrying, + GlobalStatus.AsyncCommitting, + GlobalStatus.StopRollbackOrRollbackRetry, + GlobalStatus.StopCommitOrCommitRetry, + GlobalStatus.Deleting + }; + /** * The Session map. */ protected Map sessionMap = new ConcurrentHashMap<>(64); + private final FileStoreEngine fileStoreEngine; + /** * Instantiates a new File based session manager. * @@ -74,6 +94,7 @@ public class FileSessionManager extends AbstractSessionManager implements Reload */ public FileSessionManager(String name) { super(name); + fileStoreEngine = FileStoreEngine.FILE; transactionStoreManager = new AbstractTransactionStoreManager() { @Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { @@ -91,9 +112,15 @@ public boolean writeSession(LogOperation logOperation, SessionStorable session) */ public FileSessionManager(String name, String sessionStoreFilePath) throws IOException { super(name); + fileStoreEngine = StoreConfig.getFileStoreEngine(); if (StringUtils.isNotBlank(sessionStoreFilePath)) { - transactionStoreManager = - new FileTransactionStoreManager(sessionStoreFilePath + File.separator + name, this); + if (fileStoreEngine == FileStoreEngine.ROCKSDB) { + transactionStoreManager = + new RocksDBTransactionStoreManager(sessionStoreFilePath + File.separator + name + ".rocksdb"); + } else { + transactionStoreManager = + new FileTransactionStoreManager(sessionStoreFilePath + File.separator + name, this); + } } else { transactionStoreManager = new AbstractTransactionStoreManager() { @Override @@ -106,11 +133,18 @@ public boolean writeSession(LogOperation logOperation, SessionStorable session) @Override public void reload() { + if (isRocksDBStore()) { + return; + } restoreSessions(); } @Override public void addGlobalSession(GlobalSession session) throws TransactionException { + if (isRocksDBStore()) { + super.addGlobalSession(session); + return; + } CollectionUtils.computeIfAbsent(sessionMap, session.getXid(), k -> { try { super.addGlobalSession(session); @@ -123,17 +157,27 @@ public void addGlobalSession(GlobalSession session) throws TransactionException @Override public GlobalSession findGlobalSession(String xid) { + if (isRocksDBStore()) { + return transactionStoreManager.readSession(xid); + } return sessionMap.get(xid); } @Override public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) { + if (isRocksDBStore()) { + return transactionStoreManager.readSession(xid, withBranchSessions); + } // withBranchSessions without process in memory return sessionMap.get(xid); } @Override public void removeGlobalSession(GlobalSession session) throws TransactionException { + if (isRocksDBStore()) { + super.removeGlobalSession(session); + return; + } if (sessionMap.remove(session.getXid()) != null) { super.removeGlobalSession(session); } @@ -141,11 +185,17 @@ public void removeGlobalSession(GlobalSession session) throws TransactionExcepti @Override public Collection allSessions() { + if (isRocksDBStore()) { + return findGlobalSessions(new SessionCondition(ACTIVE_STATUSES)); + } return sessionMap.values(); } @Override public List findGlobalSessions(SessionCondition condition) { + if (isRocksDBStore()) { + return transactionStoreManager.readSession(condition); + } List globalStatuses = null; if (null != condition.getStatuses() && condition.getStatuses().length > 0) { globalStatuses = Arrays.asList(condition.getStatuses()); @@ -385,6 +435,10 @@ public void setSessionMap(Map sessionMap) { this.sessionMap = sessionMap; } + private boolean isRocksDBStore() { + return fileStoreEngine == FileStoreEngine.ROCKSDB; + } + @Override public void destroy() { transactionStoreManager.shutdown(); diff --git a/server/src/main/java/org/apache/seata/server/storage/file/store/RocksDBTransactionStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/file/store/RocksDBTransactionStoreManager.java new file mode 100644 index 00000000000..89b509cd1ba --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/storage/file/store/RocksDBTransactionStoreManager.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.storage.file.store; + +import org.apache.seata.common.exception.StoreException; +import org.apache.seata.common.util.CollectionUtils; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.server.session.BranchSession; +import org.apache.seata.server.session.GlobalSession; +import org.apache.seata.server.session.SessionCondition; +import org.apache.seata.server.storage.file.FlushDiskMode; +import org.apache.seata.server.store.AbstractTransactionStoreManager; +import org.apache.seata.server.store.SessionStorable; +import org.apache.seata.server.store.StoreConfig; +import org.apache.seata.server.store.TransactionStoreManager; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.seata.common.DefaultValues.DEFAULT_QUERY_LIMIT; + +/** + * RocksDB transaction store for local file mode. + */ +public class RocksDBTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBTransactionStoreManager.class); + + private static final byte[] EMPTY_VALUE = new byte[0]; + private static final String GLOBAL_PREFIX = "g|"; + private static final String TRANSACTION_PREFIX = "t|"; + private static final String BRANCH_PREFIX = "b|"; + private static final String STATUS_PREFIX = "s|"; + private static final int PADDED_LONG_LENGTH = 19; + + private final Options options; + private final WriteOptions writeOptions; + private final RocksDB db; + private final int logQueryLimit; + + static { + RocksDB.loadLibrary(); + } + + public RocksDBTransactionStoreManager(String dbPath) throws IOException { + this(dbPath, StoreConfig.getFlushDiskMode()); + } + + public RocksDBTransactionStoreManager(String dbPath, FlushDiskMode flushDiskMode) throws IOException { + try { + File dbDir = new File(dbPath); + File parent = dbDir.getParentFile(); + if (parent != null && !parent.exists()) { + parent.mkdirs(); + } + options = new Options().setCreateIfMissing(true); + writeOptions = new WriteOptions().setSync(flushDiskMode == FlushDiskMode.SYNC_MODEL); + db = RocksDB.open(options, dbPath); + logQueryLimit = DEFAULT_QUERY_LIMIT; + } catch (RocksDBException e) { + throw new IOException("init rocksdb store error, path:" + dbPath, e); + } + } + + @Override + public boolean writeSession(LogOperation logOperation, SessionStorable session) { + try { + switch (logOperation) { + case GLOBAL_ADD: + return insertGlobalSession((GlobalSession) session); + case GLOBAL_UPDATE: + return updateGlobalSession((GlobalSession) session); + case GLOBAL_REMOVE: + return deleteGlobalSession((GlobalSession) session); + case BRANCH_ADD: + return insertBranchSession((BranchSession) session); + case BRANCH_UPDATE: + return updateBranchSession((BranchSession) session); + case BRANCH_REMOVE: + return deleteBranchSession((BranchSession) session); + default: + throw new StoreException("Unknown LogOperation:" + logOperation.name()); + } + } catch (RocksDBException e) { + LOGGER.error("write rocksdb store failed, operation: {}", logOperation, e); + return false; + } + } + + private boolean insertGlobalSession(GlobalSession globalSession) throws RocksDBException { + byte[] globalKey = globalKey(globalSession.getXid()); + if (db.get(globalKey) != null) { + return false; + } + try (WriteBatch batch = new WriteBatch()) { + putGlobal(batch, globalSession); + db.write(writeOptions, batch); + return true; + } + } + + private boolean updateGlobalSession(GlobalSession globalSession) throws RocksDBException { + GlobalSession previous = readGlobalSession(globalSession.getXid(), false); + if (previous == null) { + return false; + } + if (globalSession.getExpectedStatus() != null && globalSession.getExpectedStatus() != previous.getStatus()) { + return false; + } + try (WriteBatch batch = new WriteBatch()) { + deleteStatusIndex(batch, previous); + putGlobal(batch, globalSession); + db.write(writeOptions, batch); + return true; + } + } + + private boolean deleteGlobalSession(GlobalSession globalSession) throws RocksDBException { + GlobalSession previous = readGlobalSession(globalSession.getXid(), false); + if (previous == null) { + return true; + } + try (WriteBatch batch = new WriteBatch()) { + batch.delete(globalKey(previous.getXid())); + batch.delete(transactionKey(previous.getTransactionId())); + deleteStatusIndex(batch, previous); + deleteBranches(batch, previous.getXid()); + db.write(writeOptions, batch); + return true; + } + } + + private boolean insertBranchSession(BranchSession branchSession) throws RocksDBException { + byte[] branchKey = branchKey(branchSession.getXid(), branchSession.getBranchId()); + if (db.get(branchKey) != null) { + return false; + } + db.put(writeOptions, branchKey, branchSession.encode()); + return true; + } + + private boolean updateBranchSession(BranchSession branchSession) throws RocksDBException { + byte[] branchKey = branchKey(branchSession.getXid(), branchSession.getBranchId()); + if (db.get(branchKey) == null) { + return false; + } + db.put(writeOptions, branchKey, branchSession.encode()); + return true; + } + + private boolean deleteBranchSession(BranchSession branchSession) throws RocksDBException { + db.delete(writeOptions, branchKey(branchSession.getXid(), branchSession.getBranchId())); + return true; + } + + private void putGlobal(WriteBatch batch, GlobalSession globalSession) throws RocksDBException { + batch.put(globalKey(globalSession.getXid()), globalSession.encode()); + batch.put(transactionKey(globalSession.getTransactionId()), bytes(globalSession.getXid())); + batch.put(statusKey(globalSession), EMPTY_VALUE); + } + + private void deleteBranches(WriteBatch batch, String xid) throws RocksDBException { + byte[] prefix = branchPrefix(xid); + try (RocksIterator iterator = db.newIterator()) { + for (iterator.seek(prefix); iterator.isValid() && startsWith(iterator.key(), prefix); iterator.next()) { + batch.delete(iterator.key()); + } + iterator.status(); + } + } + + private void deleteStatusIndex(WriteBatch batch, GlobalSession globalSession) throws RocksDBException { + batch.delete(statusKey(globalSession)); + } + + @Override + public GlobalSession readSession(String xid) { + return readSession(xid, true); + } + + @Override + public GlobalSession readSession(String xid, boolean withBranchSessions) { + try { + return readGlobalSession(xid, withBranchSessions); + } catch (RocksDBException e) { + throw new StoreException(e); + } + } + + private GlobalSession readGlobalSession(String xid, boolean withBranchSessions) throws RocksDBException { + byte[] globalBytes = db.get(globalKey(xid)); + if (globalBytes == null) { + return null; + } + GlobalSession globalSession = decodeGlobalSession(globalBytes, !withBranchSessions); + if (withBranchSessions) { + for (BranchSession branchSession : readBranchSessions(xid)) { + globalSession.add(branchSession); + } + } + return globalSession; + } + + @Override + public List readSortByTimeoutBeginSessions(boolean withBranchSessions) { + return readSession(new GlobalStatus[] {GlobalStatus.Begin}, withBranchSessions); + } + + @Override + public List readSession(GlobalStatus[] statuses, boolean withBranchSessions) { + return readSession(statuses, withBranchSessions, null); + } + + @Override + public List readSession(SessionCondition sessionCondition) { + if (StringUtils.isNotBlank(sessionCondition.getXid())) { + GlobalSession globalSession = readSession(sessionCondition.getXid(), !sessionCondition.isLazyLoadBranch()); + return globalSession == null ? Collections.emptyList() : Collections.singletonList(globalSession); + } + if (sessionCondition.getTransactionId() != null) { + GlobalSession globalSession = readSessionByTransactionId( + sessionCondition.getTransactionId(), !sessionCondition.isLazyLoadBranch()); + return globalSession == null ? Collections.emptyList() : Collections.singletonList(globalSession); + } + if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) { + return readSession( + sessionCondition.getStatuses(), + !sessionCondition.isLazyLoadBranch(), + sessionCondition.getOverTimeAliveMills()); + } + if (sessionCondition.getOverTimeAliveMills() != null) { + return readAllSessions(!sessionCondition.isLazyLoadBranch(), sessionCondition.getOverTimeAliveMills()); + } + return Collections.emptyList(); + } + + private GlobalSession readSessionByTransactionId(long transactionId, boolean withBranchSessions) { + try { + byte[] xidBytes = db.get(transactionKey(transactionId)); + if (xidBytes == null) { + return null; + } + return readGlobalSession(new String(xidBytes, StandardCharsets.UTF_8), withBranchSessions); + } catch (RocksDBException e) { + throw new StoreException(e); + } + } + + private List readSession( + GlobalStatus[] statuses, boolean withBranchSessions, Long overTimeAliveMills) { + if (statuses == null || statuses.length == 0 || logQueryLimit <= 0) { + return Collections.emptyList(); + } + List globalSessions = new ArrayList<>(); + long now = System.currentTimeMillis(); + try (RocksIterator iterator = db.newIterator()) { + for (GlobalStatus status : statuses) { + byte[] prefix = statusPrefix(status); + for (iterator.seek(prefix); iterator.isValid() && startsWith(iterator.key(), prefix); iterator.next()) { + GlobalSession globalSession = + readGlobalSession(xidFromStatusKey(iterator.key()), withBranchSessions); + if (globalSession == null) { + continue; + } + if (overTimeAliveMills != null && now - globalSession.getBeginTime() <= overTimeAliveMills) { + break; + } + globalSessions.add(globalSession); + if (globalSessions.size() >= logQueryLimit) { + return globalSessions; + } + } + } + iterator.status(); + return globalSessions; + } catch (RocksDBException e) { + throw new StoreException(e); + } + } + + private List readAllSessions(boolean withBranchSessions, Long overTimeAliveMills) { + List globalSessions = new ArrayList<>(); + long now = System.currentTimeMillis(); + byte[] prefix = bytes(GLOBAL_PREFIX); + try (RocksIterator iterator = db.newIterator()) { + for (iterator.seek(prefix); iterator.isValid() && startsWith(iterator.key(), prefix); iterator.next()) { + GlobalSession globalSession = decodeGlobalSession(iterator.value(), !withBranchSessions); + if (overTimeAliveMills != null && now - globalSession.getBeginTime() <= overTimeAliveMills) { + continue; + } + if (withBranchSessions) { + for (BranchSession branchSession : readBranchSessions(globalSession.getXid())) { + globalSession.add(branchSession); + } + } + globalSessions.add(globalSession); + if (globalSessions.size() >= logQueryLimit) { + return globalSessions; + } + } + iterator.status(); + return globalSessions; + } catch (RocksDBException e) { + throw new StoreException(e); + } + } + + private List readBranchSessions(String xid) throws RocksDBException { + List branchSessions = new ArrayList<>(); + byte[] prefix = branchPrefix(xid); + try (RocksIterator iterator = db.newIterator()) { + for (iterator.seek(prefix); iterator.isValid() && startsWith(iterator.key(), prefix); iterator.next()) { + BranchSession branchSession = new BranchSession(); + branchSession.decode(iterator.value()); + branchSessions.add(branchSession); + } + iterator.status(); + } + return branchSessions; + } + + private GlobalSession decodeGlobalSession(byte[] globalBytes, boolean lazyLoadBranch) { + GlobalSession decoded = new GlobalSession(); + decoded.decode(globalBytes); + if (!lazyLoadBranch) { + return decoded; + } + GlobalSession lazySession = new GlobalSession( + decoded.getApplicationId(), + decoded.getTransactionServiceGroup(), + decoded.getTransactionName(), + decoded.getTimeout(), + true); + lazySession.setTransactionId(decoded.getTransactionId()); + lazySession.setXid(decoded.getXid()); + lazySession.setStatus(decoded.getStatus()); + lazySession.setBeginTime(decoded.getBeginTime()); + lazySession.setApplicationData(decoded.getApplicationData()); + return lazySession; + } + + @Override + public void shutdown() { + db.close(); + writeOptions.close(); + options.close(); + } + + private byte[] globalKey(String xid) { + return bytes(GLOBAL_PREFIX + xid); + } + + private byte[] transactionKey(long transactionId) { + return bytes(TRANSACTION_PREFIX + paddedLong(transactionId)); + } + + private byte[] branchKey(String xid, long branchId) { + return bytes(BRANCH_PREFIX + xid + "|" + paddedLong(branchId)); + } + + private byte[] branchPrefix(String xid) { + return bytes(BRANCH_PREFIX + xid + "|"); + } + + private byte[] statusKey(GlobalSession globalSession) { + return bytes(statusPrefixString(globalSession.getStatus()) + + paddedLong(globalSession.getBeginTime()) + + "|" + + globalSession.getXid()); + } + + private byte[] statusPrefix(GlobalStatus status) { + return bytes(statusPrefixString(status)); + } + + private String statusPrefixString(GlobalStatus status) { + return STATUS_PREFIX + status.getCode() + "|"; + } + + private String xidFromStatusKey(byte[] key) { + String keyString = new String(key, StandardCharsets.UTF_8); + return keyString.substring(keyString.lastIndexOf('|') + 1); + } + + private String paddedLong(long value) { + return String.format("%0" + PADDED_LONG_LENGTH + "d", value); + } + + private byte[] bytes(String value) { + return value.getBytes(StandardCharsets.UTF_8); + } + + private boolean startsWith(byte[] value, byte[] prefix) { + if (value.length < prefix.length) { + return false; + } + for (int i = 0; i < prefix.length; i++) { + if (value[i] != prefix[i]) { + return false; + } + } + return true; + } +} diff --git a/server/src/main/java/org/apache/seata/server/store/StoreConfig.java b/server/src/main/java/org/apache/seata/server/store/StoreConfig.java index 89f1e3f3fea..2c496cb3f86 100644 --- a/server/src/main/java/org/apache/seata/server/store/StoreConfig.java +++ b/server/src/main/java/org/apache/seata/server/store/StoreConfig.java @@ -24,6 +24,7 @@ import org.apache.seata.config.ConfigurationFactory; import org.apache.seata.core.constants.ConfigurationKeys; import org.apache.seata.server.env.ContainerHelper; +import org.apache.seata.server.storage.file.FileStoreEngine; import org.apache.seata.server.storage.file.FlushDiskMode; import static org.apache.seata.common.DefaultValues.SERVER_DEFAULT_STORE_MODE; @@ -72,6 +73,8 @@ public static void setStartupParameter(String storeMode, String sessionMode, Str */ private static final int DEFAULT_WRITE_BUFFER_SIZE = 1024 * 16; + private static final String DEFAULT_FILE_STORE_ENGINE = FileStoreEngine.FILE.getName(); + public static int getMaxBranchSessionSize() { return CONFIGURATION.getInt(STORE_FILE_PREFIX + "maxBranchSessionSize", DEFAULT_MAX_BRANCH_SESSION_SIZE); } @@ -88,6 +91,11 @@ public static FlushDiskMode getFlushDiskMode() { return FlushDiskMode.findDiskMode(CONFIGURATION.getConfig(STORE_FILE_PREFIX + "flushDiskMode")); } + public static FileStoreEngine getFileStoreEngine() { + return FileStoreEngine.get( + CONFIGURATION.getConfig(ConfigurationKeys.STORE_FILE_ENGINE, DEFAULT_FILE_STORE_ENGINE)); + } + /** * only for inner call * diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml index eff0d0bbbe8..40155201059 100644 --- a/server/src/main/resources/application.example.yml +++ b/server/src/main/resources/application.example.yml @@ -189,6 +189,8 @@ seata: lock: mode: file file: + # support: file, rocksdb + engine: file dir: sessionStore max-branch-session-size: 16384 max-global-session-size: 512 diff --git a/server/src/main/resources/application.raft.example.yml b/server/src/main/resources/application.raft.example.yml index 1016333b28a..b0653a34281 100644 --- a/server/src/main/resources/application.raft.example.yml +++ b/server/src/main/resources/application.raft.example.yml @@ -161,6 +161,8 @@ seata: # support: file mode: raft file: + # support: file, rocksdb + engine: file dir: sessionStore max-branch-session-size: 16384 max-global-session-size: 512 diff --git a/server/src/test/java/org/apache/seata/server/store/file/RocksDBTransactionStoreManagerTest.java b/server/src/test/java/org/apache/seata/server/store/file/RocksDBTransactionStoreManagerTest.java new file mode 100644 index 00000000000..766f9e5f7ba --- /dev/null +++ b/server/src/test/java/org/apache/seata/server/store/file/RocksDBTransactionStoreManagerTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.store.file; + +import org.apache.commons.io.FileUtils; +import org.apache.seata.core.model.BranchStatus; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.server.BaseSpringBootTest; +import org.apache.seata.server.session.BranchSession; +import org.apache.seata.server.session.GlobalSession; +import org.apache.seata.server.session.SessionCondition; +import org.apache.seata.server.storage.file.FlushDiskMode; +import org.apache.seata.server.storage.file.store.RocksDBTransactionStoreManager; +import org.apache.seata.server.store.TransactionStoreManager; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.List; + +public class RocksDBTransactionStoreManagerTest extends BaseSpringBootTest { + + @TempDir + private Path tempDir; + + private RocksDBTransactionStoreManager storeManager; + + @AfterEach + public void tearDown() throws Exception { + if (storeManager != null) { + storeManager.shutdown(); + } + FileUtils.deleteDirectory(tempDir.toFile()); + } + + @Test + public void testWriteReadAndRemoveSessions() throws Exception { + storeManager = newStoreManager(); + GlobalSession globalSession = newGlobalSession(); + BranchSession branchSession = newBranchSession(globalSession); + + Assertions.assertTrue( + storeManager.writeSession(TransactionStoreManager.LogOperation.GLOBAL_ADD, globalSession)); + Assertions.assertTrue( + storeManager.writeSession(TransactionStoreManager.LogOperation.BRANCH_ADD, branchSession)); + + GlobalSession found = storeManager.readSession(globalSession.getXid()); + Assertions.assertNotNull(found); + Assertions.assertEquals(globalSession.getTransactionId(), found.getTransactionId()); + Assertions.assertEquals(globalSession.getStatus(), found.getStatus()); + Assertions.assertEquals(1, found.getBranchSessions().size()); + Assertions.assertEquals( + branchSession.getBranchId(), found.getBranchSessions().get(0).getBranchId()); + + GlobalSession lazyFound = storeManager.readSession(globalSession.getXid(), false); + Assertions.assertTrue(lazyFound.isLazyLoadBranch()); + + globalSession.setStatus(GlobalStatus.Rollbacking); + Assertions.assertTrue( + storeManager.writeSession(TransactionStoreManager.LogOperation.GLOBAL_UPDATE, globalSession)); + Assertions.assertEquals( + GlobalStatus.Rollbacking, + storeManager.readSession(globalSession.getXid()).getStatus()); + + branchSession.setStatus(BranchStatus.PhaseTwo_Rollbacked); + Assertions.assertTrue( + storeManager.writeSession(TransactionStoreManager.LogOperation.BRANCH_UPDATE, branchSession)); + Assertions.assertEquals( + BranchStatus.PhaseTwo_Rollbacked, + storeManager + .readSession(globalSession.getXid()) + .getBranchSessions() + .get(0) + .getStatus()); + + Assertions.assertTrue( + storeManager.writeSession(TransactionStoreManager.LogOperation.BRANCH_REMOVE, branchSession)); + Assertions.assertTrue(storeManager + .readSession(globalSession.getXid()) + .getBranchSessions() + .isEmpty()); + + Assertions.assertTrue( + storeManager.writeSession(TransactionStoreManager.LogOperation.GLOBAL_REMOVE, globalSession)); + Assertions.assertNull(storeManager.readSession(globalSession.getXid())); + } + + @Test + public void testReadSessionByConditions() throws Exception { + storeManager = newStoreManager(); + GlobalSession beginSession = newGlobalSession(); + GlobalSession committingSession = newGlobalSession(); + committingSession.setStatus(GlobalStatus.Committing); + + Assertions.assertTrue(storeManager.writeSession(TransactionStoreManager.LogOperation.GLOBAL_ADD, beginSession)); + Assertions.assertTrue( + storeManager.writeSession(TransactionStoreManager.LogOperation.GLOBAL_ADD, committingSession)); + + SessionCondition xidCondition = new SessionCondition(beginSession.getXid()); + Assertions.assertEquals(1, storeManager.readSession(xidCondition).size()); + + SessionCondition transactionIdCondition = new SessionCondition(); + transactionIdCondition.setTransactionId(committingSession.getTransactionId()); + Assertions.assertEquals( + 1, storeManager.readSession(transactionIdCondition).size()); + + SessionCondition statusCondition = new SessionCondition(GlobalStatus.Begin); + List beginSessions = storeManager.readSession(statusCondition); + Assertions.assertEquals(1, beginSessions.size()); + Assertions.assertEquals(beginSession.getXid(), beginSessions.get(0).getXid()); + + SessionCondition overtimeCondition = new SessionCondition(GlobalStatus.Begin); + overtimeCondition.setOverTimeAliveMills(1L); + Assertions.assertEquals(1, storeManager.readSession(overtimeCondition).size()); + } + + @Test + public void testDataSurvivesRestart() throws Exception { + storeManager = newStoreManager(); + GlobalSession globalSession = newGlobalSession(); + BranchSession branchSession = newBranchSession(globalSession); + + Assertions.assertTrue( + storeManager.writeSession(TransactionStoreManager.LogOperation.GLOBAL_ADD, globalSession)); + Assertions.assertTrue( + storeManager.writeSession(TransactionStoreManager.LogOperation.BRANCH_ADD, branchSession)); + storeManager.shutdown(); + + storeManager = newStoreManager(); + GlobalSession found = storeManager.readSession(globalSession.getXid()); + Assertions.assertNotNull(found); + Assertions.assertEquals(globalSession.getTransactionId(), found.getTransactionId()); + Assertions.assertEquals(1, found.getBranchSessions().size()); + } + + private RocksDBTransactionStoreManager newStoreManager() throws Exception { + return new RocksDBTransactionStoreManager( + tempDir.resolve("session-rocksdb").toString(), FlushDiskMode.ASYNC_MODEL); + } + + private GlobalSession newGlobalSession() { + GlobalSession globalSession = new GlobalSession("test-app", "default_tx_group", "test", 60000); + globalSession.setBeginTime(System.currentTimeMillis() - 1000); + return globalSession; + } + + private BranchSession newBranchSession(GlobalSession globalSession) { + BranchSession branchSession = new BranchSession(BranchType.AT); + branchSession.setXid(globalSession.getXid()); + branchSession.setTransactionId(globalSession.getTransactionId()); + branchSession.setBranchId(globalSession.getTransactionId() + 1); + branchSession.setResourceId("jdbc:mysql://127.0.0.1:3306/seata"); + branchSession.setLockKey("account:1"); + branchSession.setClientId("client"); + branchSession.setApplicationData("applicationData"); + branchSession.setStatus(BranchStatus.Registered); + return branchSession; + } +} diff --git a/spring/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreFileProperties.java b/spring/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreFileProperties.java index 3a33624312a..8345e92a118 100644 --- a/spring/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreFileProperties.java +++ b/spring/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreFileProperties.java @@ -31,6 +31,7 @@ public class StoreFileProperties { private Integer fileWriteBufferCacheSize = 16384; private Integer sessionReloadReadSize = DEFAULT_SERVICE_SESSION_RELOAD_READ_SIZE; private String flushDiskMode = "async"; + private String engine = "file"; public String getDir() { return dir; @@ -85,4 +86,13 @@ public StoreFileProperties setFlushDiskMode(String flushDiskMode) { this.flushDiskMode = flushDiskMode; return this; } + + public String getEngine() { + return engine; + } + + public StoreFileProperties setEngine(String engine) { + this.engine = engine; + return this; + } } diff --git a/spring/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreFilePropertiesTest.java b/spring/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreFilePropertiesTest.java index 9f2c949b412..3043bdf659d 100644 --- a/spring/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreFilePropertiesTest.java +++ b/spring/seata-spring-autoconfigure/seata-spring-autoconfigure-server/src/test/java/org/apache/seata/spring/boot/autoconfigure/properties/server/store/StoreFilePropertiesTest.java @@ -30,6 +30,7 @@ public void testStoreFileProperties() { storeFileProperties.setMaxBranchSessionSize(1); storeFileProperties.setMaxGlobalSessionSize(1); storeFileProperties.setSessionReloadReadSize(1); + storeFileProperties.setEngine("rocksdb"); Assertions.assertEquals("dir", storeFileProperties.getDir()); Assertions.assertEquals("disk", storeFileProperties.getFlushDiskMode()); @@ -37,5 +38,6 @@ public void testStoreFileProperties() { Assertions.assertEquals(1, storeFileProperties.getMaxGlobalSessionSize()); Assertions.assertEquals(1, storeFileProperties.getMaxBranchSessionSize()); Assertions.assertEquals(1, storeFileProperties.getSessionReloadReadSize()); + Assertions.assertEquals("rocksdb", storeFileProperties.getEngine()); } }