diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index c9763ddc642b..583bbe9f73c3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -20,22 +20,20 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.FileStore; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.BiPredicate; import java.util.function.Function; -import java.util.stream.Collectors; import java.util.zip.CRC32; import com.google.common.annotations.VisibleForTesting; @@ -175,11 +173,22 @@ public boolean isStarted() return started; } - private File[] getUnmanagedFiles() + /** + * Returns segment files to replay according to the configured segment manager. + * + * @param filter An optional filter to apply to segment files returned by the segment manager. + * @return Segment files to replay, optionally filtered. + */ + public File[] getFilteredFiles(Optional> filter) { - File[] files = segmentManager.storageDirectory.tryList(unmanagedFilesFilter); + BiPredicate compositeFilter = (path, name) -> + filter.orElse((ignored1, ignored2) -> true).test(path, name) + && unmanagedFilesFilter.test(path, name); + + File[] files = segmentManager.storageDirectory.tryList(compositeFilter); if (files == null) return new File[0]; + return files; } @@ -198,7 +207,8 @@ public CommitLog forPath(File commitLogLocation) } /** - * Perform recovery on commit logs located in the directory specified by the config file. + * Perform recovery on commit logs located in the directory specified by the config file, + * performing archive and restore before. * The recovery is executed as a commit log read followed by a flush. * * @param flushReason the reason for flushing that fallows commit log reading, use @@ -208,12 +218,9 @@ public CommitLog forPath(File commitLogLocation) * @return keyspaces and the corresponding number of partition updates * @throws IOException */ - public Map recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason flushReason) throws IOException + public Map recoverSegmentsOnDiskWithArchive(ColumnFamilyStore.FlushReason flushReason) throws IOException { - // submit all files for this segment manager for archiving prior to recovery - CASSANDRA-6904 - // The files may have already been archived by normal CommitLog operation. This may cause errors in this - // archiving pass, which we should not treat as serious. - for (File file : getUnmanagedFiles()) + for (File file : getFilteredFiles(Optional.empty())) { archiver.maybeArchive(file.path(), file.name()); archiver.maybeWaitForArchiving(file.name()); @@ -223,7 +230,23 @@ public Map recoverSegmentsOnDisk(ColumnFamilyStore.FlushReaso archiver.maybeRestoreArchive(); // List the files again as archiver may have added segments. - File[] files = getUnmanagedFiles(); + return recoverSegmentsOnDiskNoArchive(flushReason, getFilteredFiles(Optional.empty())); + } + + /** + * Perform recovery on commit logs located in the directory specified by the config file, without archiving. + * The recovery is executed as a commit log read followed by a flush. + * + * @param flushReason the reason for flushing that fallows commit log reading, use + * {@link org.apache.cassandra.db.ColumnFamilyStore.FlushReason#STARTUP} when recovering on a + * node start. Use {@link org.apache.cassandra.db.ColumnFamilyStore.FlushReason#REMOTE_REPLAY} + * when replying commit logs to a remote storage. + * @param files THe segment files to recovery. + * @return keyspaces and the corresponding number of partition updates + * @throws IOException + */ + public Map recoverSegmentsOnDiskNoArchive(ColumnFamilyStore.FlushReason flushReason, File[] files) throws IOException + { Map replayedKeyspaces = Collections.emptyMap(); if (files.length == 0) { @@ -580,7 +603,7 @@ synchronized public void stopUnsafe(boolean deleteSegments) synchronized public Map restartUnsafe() throws IOException { started = false; - return start().recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason.STARTUP); + return start().recoverSegmentsOnDiskWithArchive(ColumnFamilyStore.FlushReason.STARTUP); } public static long freeDiskSpace() diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 7cb489452c9c..9e7317061685 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -53,7 +53,6 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SizeEstimatesRecorder; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.SystemKeyspaceMigrator40; import org.apache.cassandra.db.WindowsFailedSnapshotTracker; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.virtual.SystemViewsKeyspace; @@ -81,11 +80,11 @@ import org.apache.cassandra.security.ThreadAwareSecurityManager; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.INativeLibrary; import org.apache.cassandra.utils.JMXServerUtils; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.Mx4jTool; -import org.apache.cassandra.utils.INativeLibrary; import org.apache.cassandra.utils.WindowsTimer; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -355,7 +354,7 @@ protected void setup() // Replay any CommitLogSegments found on disk try { - CommitLog.instance.recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason.STARTUP); + CommitLog.instance.recoverSegmentsOnDiskWithArchive(ColumnFamilyStore.FlushReason.STARTUP); } catch (IOException e) { diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 5ca11918435c..e749185d0e35 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -67,8 +67,6 @@ import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.SystemKeyspaceMigrator40; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.memtable.AbstractAllocatorMemtable; @@ -586,7 +584,7 @@ public void startup(ICluster cluster) // Replay any CommitLogSegments found on disk try { - CommitLog.instance.recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason.STARTUP); + CommitLog.instance.recoverSegmentsOnDiskWithArchive(ColumnFamilyStore.FlushReason.STARTUP); } catch (IOException e) {