Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import it.unibo.alchemist.model.positions.Euclidean2DPosition;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -52,11 +55,11 @@ void testConcurrentGetContentsAndModification() throws InterruptedException {
final int numberOfOperations = 100;
final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final CountDownLatch latch = new CountDownLatch(numberOfThreads);
final AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
final List<Future<?>> tasks = new ArrayList<>(numberOfThreads);
// Start threads that modify the node while others read from it
for (int i = 0; i < numberOfThreads; i++) {
final int threadId = i;
executor.submit(() -> {
tasks.add(executor.submit(() -> {
try {
for (int j = 0; j < numberOfOperations; j++) {
if (threadId % 2 == 0) {
Expand All @@ -69,7 +72,7 @@ void testConcurrentGetContentsAndModification() throws InterruptedException {
assertNotNull(lsaSpace);
} else {
// Writer threads
final ILsaMolecule newMolecule = new LsaMolecule("thread" + threadId + "_" + j);
final ILsaMolecule newMolecule = new LsaMolecule("thread" + threadId + "x" + j);
node.setConcentration(newMolecule);
// Sometimes remove molecules to simulate real concurrent modification
if (j % 10 == 0 && node.getMoleculeCount() > MIN_MOLECULES) {
Expand All @@ -81,19 +84,57 @@ void testConcurrentGetContentsAndModification() throws InterruptedException {
}
}
}
} catch (final IllegalStateException | java.util.ConcurrentModificationException e) {
exceptionOccurred.set(true);
e.printStackTrace();
} finally {
latch.countDown();
}
});
return null;
}));
}
// Wait for all threads to complete
assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), "Test should complete within 30 seconds");
executor.shutdown();
// No exceptions should have occurred (especially no ConcurrentModificationException)
assertFalse(exceptionOccurred.get(), "No exceptions should occur during concurrent access");
AssertionError primaryFailure = null;
try {
assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), "Test should complete within " + TIMEOUT_SECONDS + " seconds");
for (final Future<?> task : tasks) {
try {
task.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("Test interrupted while waiting for task", e);
} catch (final ExecutionException e) {
throw new AssertionError("Task failed with exception", e.getCause());
} catch (final TimeoutException e) {
task.cancel(true);
throw new AssertionError("Task timed out after " + TIMEOUT_SECONDS + " seconds", e);
}
}
} catch (final AssertionError e) {
primaryFailure = e;
} finally {
executor.shutdownNow();
Comment thread
DanySK marked this conversation as resolved.
try {
if (!executor.awaitTermination(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
final AssertionError terminationError = new AssertionError(
"Executor did not terminate within " + TIMEOUT_SECONDS + " seconds");
if (primaryFailure != null) {
primaryFailure.addSuppressed(terminationError);
} else {
throw terminationError;
}
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
final AssertionError terminationError = new AssertionError(
"Interrupted while waiting for executor termination", e);
if (primaryFailure != null) {
primaryFailure.addSuppressed(terminationError);
} else {
throw terminationError;
}
}
Comment thread
DanySK marked this conversation as resolved.
}
if (primaryFailure != null) {
throw primaryFailure;
}
// Verify the node is still in a valid state
final Map<Molecule, List<ILsaMolecule>> finalContents = node.getContents();
assertNotNull(finalContents);
Expand Down
Loading