Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public ECChronos(//NOPMD long parameter list
.withTableStorageStates(myECChronosInternals.getTableStorageStates())
.withRepairLockType(configuration.getRepairConfig().getRepairLockType())
.withTimeBasedRunPolicy(myTimeBasedRunPolicy)
.withThreadPoolSize(nativeConnectionProvider.getNodes().size())
.build();

AbstractRepairConfigurationProvider repairConfigurationProvider = new FileBasedRepairConfiguration(applicationContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public class JolokiaNotificationController

private final Map<NotificationListener, String> myJolokiaRelationshipListeners = new HashMap<>();

private final ScheduledExecutorService myNotificationExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("NotificationRefresher-%d").build());
private final ScheduledExecutorService myNotificationExecutor;

private final Map<UUID, Map<String, String>> myClientIdMap = new ConcurrentHashMap<>();
private final Map<UUID, Map<String, NotificationListener>> myNodeListenersMap = new ConcurrentHashMap<>();
Expand All @@ -91,6 +90,10 @@ public JolokiaNotificationController(final Builder builder)
myRunDelay = builder.myRunDelay;
myIpTranslator = builder.myIpTranslator;
myCertificateHandler = builder.myCertificateHandler;
int nodeCount = myNativeConnectionProvider.getNodes().size();
myNotificationExecutor = Executors.newScheduledThreadPool(
Math.max(nodeCount, 1),
new ThreadFactoryBuilder().setNameFormat("NotificationRefresher-%d").setDaemon(true).build());
}

private HttpClient buildHttpClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider;
import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory;
import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockContentionException;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -88,10 +89,12 @@ public final class CASLockFactory implements LockFactory, Closeable

CASLockFactory(final CASLockFactoryBuilder builder)
{
Map<UUID, Node> nodes = builder.getNativeConnectionProvider().getNodes();
int nodeCount = nodes != null ? Math.max(1, nodes.size()) : 1;
myCasLockProperties = new CASLockProperties(
builder.getNativeConnectionProvider().getConnectionType(),
builder.getKeyspaceName(),
Executors.newSingleThreadScheduledExecutor(
Executors.newScheduledThreadPool(nodeCount,
new ThreadFactoryBuilder().setNameFormat("LockRefresher-%d").build()),
builder.getConsistencyType(),
builder.getNativeConnectionProvider().getCqlSession());
Expand Down Expand Up @@ -293,7 +296,7 @@ private DistributedLock doTryLock(final String dataCenter,
}
else
{
throw new LockException(String.format("Unable to lock resource %s in datacenter %s", resource, dataCenter));
throw new LockContentionException(String.format("Unable to lock resource %s in datacenter %s", resource, dataCenter));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.ericsson.bss.cassandra.ecchronos.core.impl.locks;

import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory.DistributedLock;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockContentionException;
import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
Expand Down Expand Up @@ -76,6 +77,10 @@ public DistributedLock getLock(final String dataCenter,
{
return myLockSupplier.getLock(dataCenter, resource, priority, metadata);
}
catch (LockContentionException e)
{
throw e;
}
catch (LockException e)
{
myFailureCache.put(lockKey, e);
Expand All @@ -85,7 +90,7 @@ public DistributedLock getLock(final String dataCenter,

private void throwCachedLockException(final LockException e) throws LockException
{
LOG.debug("Encountered cached locking failure, throwing exception", e);
LOG.info("[DIAG] Lock blocked by cached failure: {}", e.getMessage());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the idea with these [DIAG] log entries? Are they temporary for this PR? I mean, going from DEBUG to INFO and attaching a DIAG tag...

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, don't do e.getMessage(). It will only print the message, it dosen't tell you what the exception was. Remove to {} and do just e the way it was before, that will give you the expetion, the message and the stack trace.

throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ private void allTableOperation(
final String keyspaceName,
final BiConsumer<TableReference, TableMetadata> consumer)
{
for (TableMetadata tableMetadata : Metadata.getKeyspace(mySession, keyspaceName).get().getTables().values())
{
String tableName = tableMetadata.getName().asInternal();
TableReference tableReference = myTableReferenceFactory.forTable(keyspaceName, tableName);

consumer.accept(tableReference, tableMetadata);
}
Metadata.getKeyspace(mySession, keyspaceName).get().getTables().values().parallelStream()
.forEach(tableMetadata ->
{
String tableName = tableMetadata.getName().asInternal();
TableReference tableReference = myTableReferenceFactory.forTable(keyspaceName, tableName);
consumer.accept(tableReference, tableMetadata);
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.AbstractMap;
import java.util.Collection;

import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -68,11 +67,12 @@
public final class RepairSchedulerImpl implements RepairScheduler, Closeable
{
private static final int DEFAULT_TERMINATION_WAIT_IN_SECONDS = 10;
private static final int DEFAULT_THREAD_POOL_SIZE = 4;

private static final Logger LOG = LoggerFactory.getLogger(RepairSchedulerImpl.class);

private final Map<UUID, Map<TableReference, Set<ScheduledRepairJob>>> myScheduledJobs = new ConcurrentHashMap<>();
private final Object myLock = new Object();
private final ConcurrentHashMap<UUID, Object> myNodeLocks = new ConcurrentHashMap<>();

private final ExecutorService myExecutor;
private final TableRepairMetrics myTableRepairMetrics;
Expand Down Expand Up @@ -107,7 +107,8 @@ private Set<ScheduledRepairJob> validateScheduleMap(final UUID nodeID, final Tab

private RepairSchedulerImpl(final Builder builder)
{
myExecutor = Executors.newSingleThreadScheduledExecutor(
int poolSize = Math.max(DEFAULT_THREAD_POOL_SIZE, builder.myThreadPoolSize);
myExecutor = Executors.newFixedThreadPool(poolSize,
new ThreadFactoryBuilder().setNameFormat("RepairScheduler-%d").build());
myFaultReporter = builder.myFaultReporter;
myTableRepairMetrics = builder.myTableRepairMetrics;
Expand All @@ -129,6 +130,11 @@ public String getCurrentJobStatus()
return myScheduleManager.getCurrentJobStatus();
}

private Object getNodeLock(final UUID nodeID)
{
return myNodeLocks.computeIfAbsent(nodeID, k -> new Object());
}

@Override
public void close()
{
Expand All @@ -146,18 +152,16 @@ public void close()
Thread.currentThread().interrupt();
}

synchronized (myLock)
myScheduledJobs.forEach((nodeID, tableJobs) ->
{
myScheduledJobs.entrySet().stream()
.flatMap(nodeEntry -> nodeEntry.getValue().entrySet().stream()
.flatMap(tableEntry -> tableEntry.getValue().stream()
.map(job -> new AbstractMap.SimpleEntry<>(nodeEntry.getKey(), job))
)
)
.forEach(entry -> descheduleTableJob(entry.getKey(), entry.getValue()));

myScheduledJobs.clear();
}
synchronized (getNodeLock(nodeID))
{
tableJobs.values().stream()
.flatMap(Set::stream)
.forEach(job -> descheduleTableJob(nodeID, job));
}
});
myScheduledJobs.clear();
}

@Override
Expand All @@ -178,28 +182,31 @@ public void removeConfiguration(final Node node, final TableReference tableRefer
@Override
public List<ScheduledRepairJobView> getCurrentRepairJobs()
{
synchronized (myLock)
{
return myScheduledJobs.values().stream()
.flatMap(tableJobs -> tableJobs.values().stream())
.flatMap(Set::stream)
.map(ScheduledRepairJob::getView)
.collect(Collectors.toList());
}
return myScheduledJobs.entrySet().stream()
.flatMap(nodeEntry ->
{
synchronized (getNodeLock(nodeEntry.getKey()))
{
return nodeEntry.getValue().values().stream()
.flatMap(Set::stream)
.map(ScheduledRepairJob::getView)
.collect(Collectors.toList())
.stream();
}
})
.collect(Collectors.toList());
}

@Override
public List<ScheduledRepairJobView> getCurrentRepairJobsByNode(final UUID nodeId)
{
synchronized (myLock)
Map<TableReference, Set<ScheduledRepairJob>> tableJobs = myScheduledJobs.get(nodeId);
if (tableJobs == null)
{
return Collections.emptyList();
}
synchronized (getNodeLock(nodeId))
{
Map<TableReference, Set<ScheduledRepairJob>> tableJobs = myScheduledJobs.get(nodeId);

if (tableJobs == null)
{
return Collections.emptyList();
}

return tableJobs.values().stream()
.flatMap(Set::stream)
.map(ScheduledRepairJob::getView)
Expand All @@ -213,7 +220,7 @@ private void handleTableConfigurationChange(
final TableReference tableReference,
final Set<RepairConfiguration> repairConfigurations)
{
synchronized (myLock)
synchronized (getNodeLock(node.getHostId()))
{
try
{
Expand Down Expand Up @@ -311,7 +318,7 @@ private void createTableSchedule(

private void tableConfigurationRemoved(final Node node, final TableReference tableReference)
{
synchronized (myLock)
synchronized (getNodeLock(node.getHostId()))
{
try
{
Expand All @@ -336,7 +343,7 @@ public void removeAllConfigurationsForNode(final UUID nodeId)

private void nodeConfigurationRemoved(final UUID nodeId)
{
synchronized (myLock)
synchronized (getNodeLock(nodeId))
{
try
{
Expand Down Expand Up @@ -448,6 +455,7 @@ public static class Builder
private TableStorageStates myTableStorageStates;
private RepairLockType myRepairLockType;
private TimeBasedRunPolicy myTimeBasedRunPolicy;
private int myThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;

/**
* RepairSchedulerImpl build with repair lock type.
Expand Down Expand Up @@ -593,6 +601,18 @@ public Builder withTimeBasedRunPolicy(final TimeBasedRunPolicy timeBasedRunPolic
return this;
}

/**
* RepairSchedulerImpl build with thread pool size.
*
* @param threadPoolSize Thread pool size for parallel configuration processing.
* @return Builder
*/
public Builder withThreadPoolSize(final int threadPoolSize)
{
myThreadPoolSize = threadPoolSize;
return this;
}

/**
* RepairSchedulerImpl build.
*
Expand Down
Loading
Loading