diff --git a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java index 0b6ddd704..bc8ef4c23 100644 --- a/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java +++ b/application/src/main/java/com/ericsson/bss/cassandra/ecchronos/application/spring/ECChronos.java @@ -121,6 +121,7 @@ public ECChronos(//NOPMD long parameter list .withTableStorageStates(myECChronosInternals.getTableStorageStates()) .withRepairLockType(configuration.getRepairConfig().getRepairLockType()) .withTimeBasedRunPolicy(myTimeBasedRunPolicy) + .withThreadPoolSize(nativeConnectionProvider.getNodes().size()) .build(); AbstractRepairConfigurationProvider repairConfigurationProvider = new FileBasedRepairConfiguration(applicationContext); diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/JolokiaNotificationController.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/JolokiaNotificationController.java index 8126e4342..bcca6b32b 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/JolokiaNotificationController.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/jmx/JolokiaNotificationController.java @@ -63,8 +63,7 @@ public class JolokiaNotificationController private final Map myJolokiaRelationshipListeners = new HashMap<>(); - private final ScheduledExecutorService myNotificationExecutor = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("NotificationRefresher-%d").build()); + private final ScheduledExecutorService myNotificationExecutor; private final Map> myClientIdMap = new ConcurrentHashMap<>(); private final Map> myNodeListenersMap = new ConcurrentHashMap<>(); @@ -91,6 +90,10 @@ public JolokiaNotificationController(final Builder builder) myRunDelay = builder.myRunDelay; myIpTranslator = builder.myIpTranslator; myCertificateHandler = builder.myCertificateHandler; + int nodeCount = myNativeConnectionProvider.getNodes().size(); + myNotificationExecutor = Executors.newScheduledThreadPool( + Math.max(nodeCount, 1), + new ThreadFactoryBuilder().setNameFormat("NotificationRefresher-%d").setDaemon(true).build()); } private HttpClient buildHttpClient() diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java index 9f33f7a94..f57ae0c52 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/CASLockFactory.java @@ -26,6 +26,7 @@ import com.ericsson.bss.cassandra.ecchronos.connection.DistributedNativeConnectionProvider; import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory; import com.ericsson.bss.cassandra.ecchronos.core.state.HostStates; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockContentionException; import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -88,10 +89,12 @@ public final class CASLockFactory implements LockFactory, Closeable CASLockFactory(final CASLockFactoryBuilder builder) { + Map nodes = builder.getNativeConnectionProvider().getNodes(); + int nodeCount = nodes != null ? Math.max(1, nodes.size()) : 1; myCasLockProperties = new CASLockProperties( builder.getNativeConnectionProvider().getConnectionType(), builder.getKeyspaceName(), - Executors.newSingleThreadScheduledExecutor( + Executors.newScheduledThreadPool(nodeCount, new ThreadFactoryBuilder().setNameFormat("LockRefresher-%d").build()), builder.getConsistencyType(), builder.getNativeConnectionProvider().getCqlSession()); @@ -293,7 +296,7 @@ private DistributedLock doTryLock(final String dataCenter, } else { - throw new LockException(String.format("Unable to lock resource %s in datacenter %s", resource, dataCenter)); + throw new LockContentionException(String.format("Unable to lock resource %s in datacenter %s", resource, dataCenter)); } } diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java index 5c46f8d14..5322ee3fb 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/locks/LockCache.java @@ -15,6 +15,7 @@ package com.ericsson.bss.cassandra.ecchronos.core.impl.locks; import com.ericsson.bss.cassandra.ecchronos.core.locks.LockFactory.DistributedLock; +import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockContentionException; import com.ericsson.bss.cassandra.ecchronos.utils.exceptions.LockException; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -76,6 +77,10 @@ public DistributedLock getLock(final String dataCenter, { return myLockSupplier.getLock(dataCenter, resource, priority, metadata); } + catch (LockContentionException e) + { + throw e; + } catch (LockException e) { myFailureCache.put(lockKey, e); @@ -85,7 +90,7 @@ public DistributedLock getLock(final String dataCenter, private void throwCachedLockException(final LockException e) throws LockException { - LOG.debug("Encountered cached locking failure, throwing exception", e); + LOG.info("[DIAG] Lock blocked by cached failure: {}", e.getMessage()); throw e; } diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/multithreads/NodeWorker.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/multithreads/NodeWorker.java index c952aeea0..2f1503198 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/multithreads/NodeWorker.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/multithreads/NodeWorker.java @@ -208,13 +208,13 @@ private void allTableOperation( final String keyspaceName, final BiConsumer consumer) { - for (TableMetadata tableMetadata : Metadata.getKeyspace(mySession, keyspaceName).get().getTables().values()) - { - String tableName = tableMetadata.getName().asInternal(); - TableReference tableReference = myTableReferenceFactory.forTable(keyspaceName, tableName); - - consumer.accept(tableReference, tableMetadata); - } + Metadata.getKeyspace(mySession, keyspaceName).get().getTables().values().parallelStream() + .forEach(tableMetadata -> + { + String tableName = tableMetadata.getName().asInternal(); + TableReference tableReference = myTableReferenceFactory.forTable(keyspaceName, tableName); + consumer.accept(tableReference, tableMetadata); + }); } /** diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/RepairSchedulerImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/RepairSchedulerImpl.java index 022145870..b527a3254 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/RepairSchedulerImpl.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/RepairSchedulerImpl.java @@ -49,7 +49,6 @@ import java.util.HashMap; import java.util.List; import java.util.ArrayList; -import java.util.AbstractMap; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; @@ -68,11 +67,12 @@ public final class RepairSchedulerImpl implements RepairScheduler, Closeable { private static final int DEFAULT_TERMINATION_WAIT_IN_SECONDS = 10; + private static final int DEFAULT_THREAD_POOL_SIZE = 4; private static final Logger LOG = LoggerFactory.getLogger(RepairSchedulerImpl.class); private final Map>> myScheduledJobs = new ConcurrentHashMap<>(); - private final Object myLock = new Object(); + private final ConcurrentHashMap myNodeLocks = new ConcurrentHashMap<>(); private final ExecutorService myExecutor; private final TableRepairMetrics myTableRepairMetrics; @@ -107,7 +107,8 @@ private Set validateScheduleMap(final UUID nodeID, final Tab private RepairSchedulerImpl(final Builder builder) { - myExecutor = Executors.newSingleThreadScheduledExecutor( + int poolSize = Math.max(DEFAULT_THREAD_POOL_SIZE, builder.myThreadPoolSize); + myExecutor = Executors.newFixedThreadPool(poolSize, new ThreadFactoryBuilder().setNameFormat("RepairScheduler-%d").build()); myFaultReporter = builder.myFaultReporter; myTableRepairMetrics = builder.myTableRepairMetrics; @@ -129,6 +130,11 @@ public String getCurrentJobStatus() return myScheduleManager.getCurrentJobStatus(); } + private Object getNodeLock(final UUID nodeID) + { + return myNodeLocks.computeIfAbsent(nodeID, k -> new Object()); + } + @Override public void close() { @@ -146,18 +152,16 @@ public void close() Thread.currentThread().interrupt(); } - synchronized (myLock) + myScheduledJobs.forEach((nodeID, tableJobs) -> { - myScheduledJobs.entrySet().stream() - .flatMap(nodeEntry -> nodeEntry.getValue().entrySet().stream() - .flatMap(tableEntry -> tableEntry.getValue().stream() - .map(job -> new AbstractMap.SimpleEntry<>(nodeEntry.getKey(), job)) - ) - ) - .forEach(entry -> descheduleTableJob(entry.getKey(), entry.getValue())); - - myScheduledJobs.clear(); - } + synchronized (getNodeLock(nodeID)) + { + tableJobs.values().stream() + .flatMap(Set::stream) + .forEach(job -> descheduleTableJob(nodeID, job)); + } + }); + myScheduledJobs.clear(); } @Override @@ -178,28 +182,31 @@ public void removeConfiguration(final Node node, final TableReference tableRefer @Override public List getCurrentRepairJobs() { - synchronized (myLock) - { - return myScheduledJobs.values().stream() - .flatMap(tableJobs -> tableJobs.values().stream()) - .flatMap(Set::stream) - .map(ScheduledRepairJob::getView) - .collect(Collectors.toList()); - } + return myScheduledJobs.entrySet().stream() + .flatMap(nodeEntry -> + { + synchronized (getNodeLock(nodeEntry.getKey())) + { + return nodeEntry.getValue().values().stream() + .flatMap(Set::stream) + .map(ScheduledRepairJob::getView) + .collect(Collectors.toList()) + .stream(); + } + }) + .collect(Collectors.toList()); } @Override public List getCurrentRepairJobsByNode(final UUID nodeId) { - synchronized (myLock) + Map> tableJobs = myScheduledJobs.get(nodeId); + if (tableJobs == null) + { + return Collections.emptyList(); + } + synchronized (getNodeLock(nodeId)) { - Map> tableJobs = myScheduledJobs.get(nodeId); - - if (tableJobs == null) - { - return Collections.emptyList(); - } - return tableJobs.values().stream() .flatMap(Set::stream) .map(ScheduledRepairJob::getView) @@ -213,7 +220,7 @@ private void handleTableConfigurationChange( final TableReference tableReference, final Set repairConfigurations) { - synchronized (myLock) + synchronized (getNodeLock(node.getHostId())) { try { @@ -311,7 +318,7 @@ private void createTableSchedule( private void tableConfigurationRemoved(final Node node, final TableReference tableReference) { - synchronized (myLock) + synchronized (getNodeLock(node.getHostId())) { try { @@ -336,7 +343,7 @@ public void removeAllConfigurationsForNode(final UUID nodeId) private void nodeConfigurationRemoved(final UUID nodeId) { - synchronized (myLock) + synchronized (getNodeLock(nodeId)) { try { @@ -448,6 +455,7 @@ public static class Builder private TableStorageStates myTableStorageStates; private RepairLockType myRepairLockType; private TimeBasedRunPolicy myTimeBasedRunPolicy; + private int myThreadPoolSize = DEFAULT_THREAD_POOL_SIZE; /** * RepairSchedulerImpl build with repair lock type. @@ -593,6 +601,18 @@ public Builder withTimeBasedRunPolicy(final TimeBasedRunPolicy timeBasedRunPolic return this; } + /** + * RepairSchedulerImpl build with thread pool size. + * + * @param threadPoolSize Thread pool size for parallel configuration processing. + * @return Builder + */ + public Builder withThreadPoolSize(final int threadPoolSize) + { + myThreadPoolSize = threadPoolSize; + return this; + } + /** * RepairSchedulerImpl build. * diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java index 07177a2b1..bd80f099c 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduleManagerImpl.java @@ -28,11 +28,15 @@ import java.util.Set; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; @@ -45,6 +49,7 @@ /** * ScheduleManager handles the run scheduler and update scheduler. */ +@SuppressWarnings("PMD.GodClass") public final class ScheduleManagerImpl implements ScheduleManager, Closeable { private static final Logger LOG = LoggerFactory.getLogger(ScheduleManagerImpl.class); @@ -53,91 +58,115 @@ public final class ScheduleManagerImpl implements ScheduleManager, Closeable private final Map myQueue = new ConcurrentHashMap<>(); private final Collection myNodeIDList; - private final AtomicReference currentExecutingJob = new AtomicReference<>(); + private final Map currentExecutingJobs = new ConcurrentHashMap<>(); private final Set myRunPolicies = Sets.newConcurrentHashSet(); - private final Map> myRunFuture = new ConcurrentHashMap<>(); - private final Map myRunTasks = new ConcurrentHashMap<>(); + private final Set myRegisteredNodes = Sets.newConcurrentHashSet(); private final CASLockFactory myLockFactory; private final DistributedNativeConnectionProvider myNativeConnectionProvider; - private final ScheduledThreadPoolExecutor myExecutor; + private final BlockingQueue myWorkQueue = new LinkedBlockingQueue<>(); + private final ExecutorService myWorkerPool; + private final ScheduledExecutorService myProducer; + private ScheduledFuture myProducerFuture; private final long myRunIntervalInMs; private ScheduleManagerImpl(final Builder builder) { myNodeIDList = builder.myNodeIDList; myNativeConnectionProvider = builder.myNativeConnectionProvider; - myExecutor = new ScheduledThreadPoolExecutor( - myNodeIDList.size(), new ThreadFactoryBuilder().setNameFormat("TaskExecutor-%d").build()); + myWorkerPool = Executors.newFixedThreadPool( + myNodeIDList.size(), + new ThreadFactoryBuilder().setNameFormat("TaskExecutor-%d").build()); + myProducer = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("TaskProducer").setDaemon(true).build()); myLockFactory = builder.myLockFactory; myRunIntervalInMs = builder.myRunIntervalInMs; } /** - * Create a ScheduledFuture for each of the nodes in the nodeIDList. + * Create workers for each of the nodes in the nodeIDList and start the producer. * @param nodeIDList */ @Override public void createScheduleFutureForNodeIDList(final Collection nodeIDList) { - myExecutor.setCorePoolSize(nodeIDList.size()); LOG.debug("Total nodes found: {}", nodeIDList.size()); - for (UUID nodeID : nodeIDList) - { - if (myRunTasks.get(nodeID) == null) - { - JobRunTask myRunTask = new JobRunTask(nodeID); - ScheduledFuture scheduledFuture = myExecutor.scheduleWithFixedDelay(myRunTask, - myRunIntervalInMs, - myRunIntervalInMs, - TimeUnit.MILLISECONDS); - myRunTasks.put(nodeID, myRunTask); - myRunFuture.put(nodeID, scheduledFuture); - LOG.debug("JobRunTask created for node {}", nodeID); - } - } + nodeIDList.forEach(this::registerNode); + startProducer(); + startWorkers(); } /** - * Create a ScheduledFuture for the nodeID. + * Register a new node for scheduling. * @param nodeID */ @Override public void createScheduleFutureForNode(final UUID nodeID) { - if (myRunTasks.get(nodeID) == null) + registerNode(nodeID); + } + + private void registerNode(final UUID nodeID) + { + if (myRegisteredNodes.add(nodeID)) { - JobRunTask myRunTask = new JobRunTask(nodeID); - myExecutor.setCorePoolSize(myRunTasks.size()); - - ScheduledFuture scheduledFuture = myExecutor.scheduleWithFixedDelay(myRunTask, - myRunIntervalInMs, - myRunIntervalInMs, - TimeUnit.MILLISECONDS); - myRunTasks.put(nodeID, myRunTask); - myRunFuture.put(nodeID, scheduledFuture); - myExecutor.setCorePoolSize(myRunTasks.size()); - LOG.debug("JobRunTask created for new node {}", nodeID); + LOG.debug("Node {} registered for scheduling", nodeID); } - else + } + + private void startProducer() + { + if (myProducerFuture == null) { - LOG.debug("JobRunTask already exists for new node {}", nodeID); + myProducerFuture = myProducer.scheduleAtFixedRate( + this::produceWork, 0, myRunIntervalInMs, TimeUnit.MILLISECONDS); } } - @Override - public String getCurrentJobStatus() + + private void startWorkers() { - ScheduledJob job = currentExecutingJob.get(); - if (job != null) + for (int i = 0; i < myRegisteredNodes.size(); i++) { - return job.getJobId().toString(); + myWorkerPool.submit(this::workerLoop); } - else + } + + private void produceWork() + { + for (UUID nodeID : myRegisteredNodes) { - return ""; + myWorkQueue.offer(nodeID); } } + private void workerLoop() + { + while (!Thread.currentThread().isInterrupted()) + { + try + { + UUID nodeID = myWorkQueue.take(); + runForNode(nodeID); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + break; + } + catch (Exception e) + { + LOG.error("Exception in worker loop", e); + } + } + } + @Override + public String getCurrentJobStatus() + { + return currentExecutingJobs.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue().getJobId()) + .collect(Collectors.joining("\n")); + } + /** * Adds a run policy to the collection of run policies. * @@ -185,11 +214,12 @@ public void deschedule(final UUID nodeID, final ScheduledJob job) @Override public void close() { - for (ScheduledFuture future : myRunFuture.values()) + if (myProducerFuture != null) { - future.cancel(false); + myProducerFuture.cancel(false); } - myExecutor.shutdown(); + myProducer.shutdown(); + myWorkerPool.shutdownNow(); myRunPolicies.clear(); } @@ -201,7 +231,7 @@ public void close() @VisibleForTesting public void run(final UUID nodeID) { - myRunTasks.get(nodeID).run(); + runForNode(nodeID); } /** @@ -232,148 +262,128 @@ private Long validateJob(final ScheduledJob job, final Node node) } - /** - * Internal run task that is scheduled by the {@link ScheduleManagerImpl}. - *

- * Retrieves a job from the queue and tries to run it provided that it's possible to get the required locks. - */ - private final class JobRunTask implements Runnable + private void runForNode(final UUID nodeID) { - private final UUID nodeID; - private final Node myNode; - - private JobRunTask(final UUID currentNodeID) + try { - nodeID = currentNodeID; - myNode = myNativeConnectionProvider.getNodes().get(nodeID); - } - - @Override - public void run() - { - try + LOG.debug("Running for Node {}", nodeID); + ScheduledJobQueue queue = myQueue.get(nodeID); + if (queue != null) { - LOG.debug("In JobRunTask.run for Node {}", nodeID); - if (myQueue.get(nodeID) != null) - { - tryRunNext(); - } - else + boolean didWork = tryRunNext(nodeID, queue); + if (didWork) { - LOG.info("There is no ScheduledJob for this node {} to run ", nodeID); + myWorkQueue.offer(nodeID); } - } - catch (Exception e) + else { - LOG.error("Exception while running job in node {}", nodeID, e); + LOG.info("There is no ScheduledJob for this node {} to run", nodeID); } } + catch (Exception e) + { + LOG.error("Exception while running job in node {}", nodeID, e); + } + } - private void tryRunNext() + private boolean tryRunNext(final UUID nodeID, final ScheduledJobQueue queue) + { + Node node = myNativeConnectionProvider.getNodes().get(nodeID); + LOG.debug("Looking for Job for Node {}", nodeID); + boolean jobRan = false; + for (ScheduledJob next : queue) { - LOG.debug("Looking for Job for Node {}", nodeID); - for (ScheduledJob next : myQueue.get(nodeID)) + if (validate(next, node)) { - if (validate(next)) + currentExecutingJobs.put(nodeID, next); + if (tryRunTasks(nodeID, node, next)) { - currentExecutingJob.set(next); - if (tryRunTasks(next)) - { - break; - } + jobRan = true; + break; } } - currentExecutingJob.set(null); } + currentExecutingJobs.remove(nodeID); + return jobRan; + } - private boolean validate(final ScheduledJob job) + private boolean validate(final ScheduledJob job, final Node node) + { + LOG.trace("Validating job {}", job); + long nextRun = validateJob(job, node); + if (nextRun != -1L) { - LOG.trace("Validating job {}", job); - long nextRun = validateJob(job, myNode); - - if (nextRun != -1L) - { - job.setRunnableIn(nextRun); - return false; - } - - return true; + job.setRunnableIn(nextRun); + return false; } + return true; + } - private boolean tryRunTasks( - final ScheduledJob next) + private boolean tryRunTasks(final UUID nodeID, final Node node, final ScheduledJob next) + { + boolean hasRun = false; + for (ScheduledTask task : next) { - boolean hasRun = false; - - for (ScheduledTask task : next) + if (!validate(next, node)) { - if (!validate(next)) - { - LOG.debug("Job {} was stopped, will continue later", next); - break; - } - hasRun |= tryRunTask(next, task); + LOG.debug("Job {} was stopped, will continue later", next); + break; } - - return hasRun; + hasRun |= tryRunTask(nodeID, next, task); } + return hasRun; + } - private boolean tryRunTask( - final ScheduledJob job, - final ScheduledTask task) + private boolean tryRunTask(final UUID nodeID, final ScheduledJob job, final ScheduledTask task) + { + LOG.debug("Trying to run task {} in node {}", task, nodeID); + try (LockFactory.DistributedLock lock = task.getLock(myLockFactory, nodeID)) { - LOG.debug("Trying to run task {} in node {}", task, nodeID); - LOG.debug("Trying to acquire lock for {}", task); - try (LockFactory.DistributedLock lock = task.getLock(myLockFactory, nodeID)) - { - LOG.debug("Lock has been acquired on node with Id {} with lock {}", nodeID, lock); - boolean successful = runTask(task); - job.postExecute(successful, task); - return true; - } - catch (RuntimeMBeanException e) + LOG.debug("Lock acquired on node {} with lock {}", nodeID, lock); + boolean successful = runTask(nodeID, task); + job.postExecute(successful, task); + return true; + } + catch (RuntimeMBeanException e) + { + if (e.getCause() instanceof IllegalStateException + && e.getCause().getMessage() != null + && e.getCause().getMessage().contains("More than one key found")) { - if (e.getCause() instanceof IllegalStateException - && e.getCause().getMessage() != null - && e.getCause().getMessage().contains("More than one key found")) - { - LOG.debug("Unable to get schedule lock on task {} in node {}, this is probably due to " - + "a connection to a version of the Jolokia Agent 2.3.0 or older", task, nodeID, e); - } - else - { - LOG.warn("Unable to get schedule lock on task {} in node {}", task, nodeID, e); - } - return false; + LOG.debug("Unable to get schedule lock on task {} in node {}, this is probably due to " + + "a connection to a version of the Jolokia Agent 2.3.0 or older", task, nodeID, e); } - catch (Exception e) + else { - if (e.getCause() != null) - { - LOG.warn("Unable to get schedule lock on task {} in node {}", task, nodeID, e); - } - return false; + LOG.warn("Unable to get schedule lock on task {} in node {}", task, nodeID, e); } + return false; } - - private boolean runTask( - final ScheduledTask task) + catch (Exception e) { - try + if (e.getCause() != null) { - LOG.info("Running task: {}, for node {}", task, nodeID); - return task.execute(nodeID); + LOG.warn("Unable to get schedule lock on task {} in node {}", task, nodeID, e); } - catch (Exception e) - { - LOG.warn("Unable to run task: {} in node: {}", task, nodeID, e); - } - return false; } } + private boolean runTask(final UUID nodeID, final ScheduledTask task) + { + try + { + LOG.info("Running task: {}, for node {}", task, nodeID); + return task.execute(nodeID); + } + catch (Exception e) + { + LOG.warn("Unable to run task: {} in node: {}", task, nodeID, e); + } + return false; + } + /** * Create an instance of Builder to construct ScheduleManagerImpl. * diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduledJobQueue.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduledJobQueue.java index 951965919..4f10d67d0 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduledJobQueue.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/ScheduledJobQueue.java @@ -121,9 +121,7 @@ final int size() @Override public final synchronized Iterator iterator() { - myJobQueues.values().forEach(q -> q.forEach(ScheduledJob::refreshState)); Iterator baseIterator = new ManyToOneIterator<>(myJobQueues.values(), myComparator); - return new RunnableJobIterator(baseIterator); } @@ -142,7 +140,7 @@ protected ScheduledJob computeNext() while (myBaseIterator.hasNext()) { ScheduledJob job = myBaseIterator.next(); - + job.refreshState(); ScheduledJob.State state = job.getState(); if (state == ScheduledJob.State.FAILED || state == ScheduledJob.State.FINISHED) { diff --git a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableRepairJob.java b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableRepairJob.java index eddfdad2e..6caeaaf56 100644 --- a/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableRepairJob.java +++ b/core.impl/src/main/java/com/ericsson/bss/cassandra/ecchronos/core/impl/table/TableRepairJob.java @@ -130,10 +130,16 @@ private ScheduledRepairJobView.Status getStatus(final long timestamp) if (msSinceLastRepair >= config.getRepairErrorTimeInMs()) { + LOG.warn("[DIAG] {} on node {} is OVERDUE. Last repaired {}ms ago (error threshold: {}ms, interval: {}ms)", + getTableReference(), getNodeId(), msSinceLastRepair, + config.getRepairErrorTimeInMs(), config.getRepairIntervalInMs()); return ScheduledRepairJobView.Status.OVERDUE; } if (msSinceLastRepair >= config.getRepairWarningTimeInMs()) { + LOG.warn("[DIAG] {} on node {} is LATE. Last repaired {}ms ago (warn threshold: {}ms, interval: {}ms)", + getTableReference(), getNodeId(), msSinceLastRepair, + config.getRepairWarningTimeInMs(), config.getRepairIntervalInMs()); return ScheduledRepairJobView.Status.LATE; } if (msSinceLastRepair >= (config.getRepairIntervalInMs() - getRunOffset())) diff --git a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java index 38305cbdd..47ac5f584 100644 --- a/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java +++ b/core.impl/src/test/java/com/ericsson/bss/cassandra/ecchronos/core/impl/repair/scheduler/TestScheduleManager.java @@ -217,7 +217,7 @@ public void testGetCurrentJobStatus() throws InterruptedException myScheduler.schedule(nodeID1, job1); new Thread(() -> myScheduler.run(nodeID1)).start(); Thread.sleep(50); - assertThat(myScheduler.getCurrentJobStatus()).isEqualTo(jobId.toString()); + assertThat(myScheduler.getCurrentJobStatus()).isEqualTo(nodeID1 + "=" + jobId); latch.countDown(); } diff --git a/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/exceptions/LockContentionException.java b/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/exceptions/LockContentionException.java new file mode 100644 index 000000000..befdcd300 --- /dev/null +++ b/utils/src/main/java/com/ericsson/bss/cassandra/ecchronos/utils/exceptions/LockContentionException.java @@ -0,0 +1,29 @@ +/* + * Copyright 2026 Telefonaktiebolaget LM Ericsson + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ericsson.bss.cassandra.ecchronos.utils.exceptions; + +/** + * Exception thrown when a lock cannot be acquired due to contention with another node. + * Unlike regular LockException, this should not be cached since the contention is transient. + */ +public class LockContentionException extends LockException +{ + private static final long serialVersionUID = 2799812379389641955L; + + public LockContentionException(final String message) + { + super(message); + } +}