Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,20 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileStore;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.CRC32;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -175,11 +173,22 @@ public boolean isStarted()
return started;
}

private File[] getUnmanagedFiles()
/**
* Returns segment files to replay according to the configured segment manager.
*
* @param filter An optional filter to apply to segment files returned by the segment manager.
* @return Segment files to replay, optionally filtered.
*/
public File[] getFilteredFiles(Optional<BiPredicate<File, String>> filter)
{
File[] files = segmentManager.storageDirectory.tryList(unmanagedFilesFilter);
BiPredicate<File, String> compositeFilter = (path, name) ->
filter.orElse((ignored1, ignored2) -> true).test(path, name)
&& unmanagedFilesFilter.test(path, name);

File[] files = segmentManager.storageDirectory.tryList(compositeFilter);
if (files == null)
return new File[0];

return files;
}

Expand All @@ -198,7 +207,8 @@ public CommitLog forPath(File commitLogLocation)
}

/**
* Perform recovery on commit logs located in the directory specified by the config file.
* Perform recovery on commit logs located in the directory specified by the config file,
* performing archive and restore before.
* The recovery is executed as a commit log read followed by a flush.
*
* @param flushReason the reason for flushing that fallows commit log reading, use
Expand All @@ -208,12 +218,9 @@ public CommitLog forPath(File commitLogLocation)
* @return keyspaces and the corresponding number of partition updates
* @throws IOException
*/
public Map<Keyspace, Integer> recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason flushReason) throws IOException
public Map<Keyspace, Integer> recoverSegmentsOnDiskWithArchive(ColumnFamilyStore.FlushReason flushReason) throws IOException
{
// submit all files for this segment manager for archiving prior to recovery - CASSANDRA-6904
// The files may have already been archived by normal CommitLog operation. This may cause errors in this
// archiving pass, which we should not treat as serious.
for (File file : getUnmanagedFiles())
for (File file : getFilteredFiles(Optional.empty()))
{
archiver.maybeArchive(file.path(), file.name());
archiver.maybeWaitForArchiving(file.name());
Expand All @@ -223,7 +230,23 @@ public Map<Keyspace, Integer> recoverSegmentsOnDisk(ColumnFamilyStore.FlushReaso
archiver.maybeRestoreArchive();

// List the files again as archiver may have added segments.
File[] files = getUnmanagedFiles();
return recoverSegmentsOnDiskNoArchive(flushReason, getFilteredFiles(Optional.empty()));
}

/**
* Perform recovery on commit logs located in the directory specified by the config file, without archiving.
* The recovery is executed as a commit log read followed by a flush.
*
* @param flushReason the reason for flushing that fallows commit log reading, use
* {@link org.apache.cassandra.db.ColumnFamilyStore.FlushReason#STARTUP} when recovering on a
* node start. Use {@link org.apache.cassandra.db.ColumnFamilyStore.FlushReason#REMOTE_REPLAY}
* when replying commit logs to a remote storage.
* @param files THe segment files to recovery.
* @return keyspaces and the corresponding number of partition updates
* @throws IOException
*/
public Map<Keyspace, Integer> recoverSegmentsOnDiskNoArchive(ColumnFamilyStore.FlushReason flushReason, File[] files) throws IOException
{
Map<Keyspace, Integer> replayedKeyspaces = Collections.emptyMap();
if (files.length == 0)
{
Expand Down Expand Up @@ -580,7 +603,7 @@ synchronized public void stopUnsafe(boolean deleteSegments)
synchronized public Map<Keyspace, Integer> restartUnsafe() throws IOException
{
started = false;
return start().recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason.STARTUP);
return start().recoverSegmentsOnDiskWithArchive(ColumnFamilyStore.FlushReason.STARTUP);
}

public static long freeDiskSpace()
Expand Down
5 changes: 2 additions & 3 deletions src/java/org/apache/cassandra/service/CassandraDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SizeEstimatesRecorder;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.SystemKeyspaceMigrator40;
import org.apache.cassandra.db.WindowsFailedSnapshotTracker;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
Expand Down Expand Up @@ -81,11 +80,11 @@
import org.apache.cassandra.security.ThreadAwareSecurityManager;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.INativeLibrary;
import org.apache.cassandra.utils.JMXServerUtils;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Mx4jTool;
import org.apache.cassandra.utils.INativeLibrary;
import org.apache.cassandra.utils.WindowsTimer;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -355,7 +354,7 @@ protected void setup()
// Replay any CommitLogSegments found on disk
try
{
CommitLog.instance.recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason.STARTUP);
CommitLog.instance.recoverSegmentsOnDiskWithArchive(ColumnFamilyStore.FlushReason.STARTUP);
}
catch (IOException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.SystemKeyspaceMigrator40;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.memtable.AbstractAllocatorMemtable;
Expand Down Expand Up @@ -586,7 +584,7 @@ public void startup(ICluster cluster)
// Replay any CommitLogSegments found on disk
try
{
CommitLog.instance.recoverSegmentsOnDisk(ColumnFamilyStore.FlushReason.STARTUP);
CommitLog.instance.recoverSegmentsOnDiskWithArchive(ColumnFamilyStore.FlushReason.STARTUP);
}
catch (IOException e)
{
Expand Down