Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
Expand Down Expand Up @@ -73,7 +72,7 @@ public class TarExtractor {
public TarExtractor(int threadPoolSize, String threadNamePrefix) {
this.threadPoolSize = threadPoolSize;
this.threadFactory =
new ThreadFactoryBuilder().setNameFormat("FetchOMDBTar-%d" + threadNamePrefix)
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "FetchOMDBTar-%d")
.build();
}

Expand Down Expand Up @@ -163,8 +162,7 @@ private void writeFile(Path outputDir, String fileName, byte[] fileData) {

public void start() {
if (executorServiceStarted.compareAndSet(false, true)) {
this.executor =
new ThreadPoolExecutor(0, threadPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);
this.executor = Executors.newFixedThreadPool(threadPoolSize, threadFactory);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought: The pool size is Math.max(64, Runtime.getRuntime().availableProcessors()) at the call site in OzoneManagerServiceProviderImpl. On high-CPU machines this could be large. With the old code it barely mattered; now it will actually create that many threads. On a 256-core host that's 256 threads all blocking on I/O at the same time. Consider documenting or probably IMO we should be capping this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have switched from max to min. We should now be capped to 64 threads.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -61,11 +62,9 @@
public class DataNodeMetricsService {

private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class);
private static final int MAX_POOL_SIZE = 500;
private static final int KEEP_ALIVE_TIME = 5;
private static final int POLL_INTERVAL_MS = 200;

private final ThreadPoolExecutor executorService;
private final ExecutorService executorService;
private final ReconNodeManager reconNodeManager;
private final boolean httpsEnabled;
private final int minimumApiDelayMs;
Expand Down Expand Up @@ -97,13 +96,10 @@ public DataNodeMetricsService(
this.metricsServiceProviderFactory = metricsServiceProviderFactory;
this.lastCollectionEndTime.set(-minimumApiDelayMs);
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
this.executorService = new ThreadPoolExecutor(
corePoolSize, MAX_POOL_SIZE,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder()
.setNameFormat("DataNodeMetricsCollector-%d")
.build());
ThreadFactory threadFactory = new ThreadFactoryBuilder()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original pool was intended (if misconfigured) to scale up to 500 threads under load (e.g., Recon polling thousands of DataNodes simultaneously). The fix correctly reflects actual behavior today, but it also avoiding efficient future scaling. If the intent was truly to allow bursting to a larger pool, can we think of using a bounded queue rather than simply removing it. At minimum, a comment explaining the corePoolSize choice and impact on removing the MAX_POOL_SIZE = 500.

Bu wait, this could make it slower in large cluster having hundreds of datanodes where it makes concurrent DataNode queries. The original MAX_POOL_SIZE = 500 captured the right intent — it was just broken by the unbounded LinkedBlockingQueue. A better fix would be to make the pool size configurable with a sensible default:

// e.g., add a config key OZONE_RECON_DN_METRICS_THREAD_COUNT, default 128 or min(nodeCount, 500)
int poolSize = config.getInt(OZONE_RECON_DN_METRICS_THREAD_COUNT, 128);
this.executorService = Executors.newFixedThreadPool(poolSize, threadFactory);

.setNameFormat("DataNodeMetricsCollector-%d")
.build();
this.executorService = Executors.newFixedThreadPool(corePoolSize, threadFactory);
}

/**
Expand Down