Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ public final class ReconServerConfigKeys {
"ozone.recon.dn.metrics.collection.timeout";
public static final String OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT = "10m";

public static final String OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT =
"ozone.recon.dn.metrics.collection.thread.count";
public static final int OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT =
Runtime.getRuntime().availableProcessors() * 2;

/**
* Application-level ceiling on the number of ContainerIDs fetched from SCM
* per RPC call during container sync. The effective batch size is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
Expand Down Expand Up @@ -73,7 +72,7 @@ public class TarExtractor {
public TarExtractor(int threadPoolSize, String threadNamePrefix) {
this.threadPoolSize = threadPoolSize;
this.threadFactory =
new ThreadFactoryBuilder().setNameFormat("FetchOMDBTar-%d" + threadNamePrefix)
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "FetchOMDBTar-%d")
.build();
}

Expand Down Expand Up @@ -163,8 +162,7 @@ private void writeFile(Path outputDir, String fileName, byte[] fileData) {

public void start() {
if (executorServiceStarted.compareAndSet(false, true)) {
this.executor =
new ThreadPoolExecutor(0, threadPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);
this.executor = Executors.newFixedThreadPool(threadPoolSize, threadFactory);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought: The pool size is Math.max(64, Runtime.getRuntime().availableProcessors()) at the call site in OzoneManagerServiceProviderImpl. On high-CPU machines this could be large. With the old code it barely mattered; now it will actually create that many threads. On a 256-core host that's 256 threads all blocking on I/O at the same time. Consider documenting or probably IMO we should be capping this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have switched from max to min. We should now be capped to 64 threads.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT;

Expand All @@ -32,9 +34,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -61,11 +64,9 @@
public class DataNodeMetricsService {

private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class);
private static final int MAX_POOL_SIZE = 500;
private static final int KEEP_ALIVE_TIME = 5;
private static final int POLL_INTERVAL_MS = 200;

private final ThreadPoolExecutor executorService;
private final ExecutorService executorService;
private final ReconNodeManager reconNodeManager;
private final boolean httpsEnabled;
private final int minimumApiDelayMs;
Expand Down Expand Up @@ -96,14 +97,15 @@ public DataNodeMetricsService(
OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
this.metricsServiceProviderFactory = metricsServiceProviderFactory;
this.lastCollectionEndTime.set(-minimumApiDelayMs);
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
this.executorService = new ThreadPoolExecutor(
corePoolSize, MAX_POOL_SIZE,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder()
.setNameFormat("DataNodeMetricsCollector-%d")
.build());
int corePoolSize = config.getInt(OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT,
OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT);
corePoolSize = corePoolSize > 0
? corePoolSize
: OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT;
ThreadFactory threadFactory = new ThreadFactoryBuilder()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original pool was intended (if misconfigured) to scale up to 500 threads under load (e.g., Recon polling thousands of DataNodes simultaneously). The fix correctly reflects actual behavior today, but it also avoiding efficient future scaling. If the intent was truly to allow bursting to a larger pool, can we think of using a bounded queue rather than simply removing it. At minimum, a comment explaining the corePoolSize choice and impact on removing the MAX_POOL_SIZE = 500.

Bu wait, this could make it slower in large cluster having hundreds of datanodes where it makes concurrent DataNode queries. The original MAX_POOL_SIZE = 500 captured the right intent — it was just broken by the unbounded LinkedBlockingQueue. A better fix would be to make the pool size configurable with a sensible default:

// e.g., add a config key OZONE_RECON_DN_METRICS_THREAD_COUNT, default 128 or min(nodeCount, 500)
int poolSize = config.getInt(OZONE_RECON_DN_METRICS_THREAD_COUNT, 128);
this.executorService = Executors.newFixedThreadPool(poolSize, threadFactory);

.setNameFormat("DataNodeMetricsCollector-%d")
.build();
this.executorService = Executors.newFixedThreadPool(corePoolSize, threadFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public OzoneManagerServiceProviderImpl(
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "SyncOM-%d")
.build();
// Number of parallel workers
int omDBTarProcessorThreadCount = Math.max(64, Runtime.getRuntime().availableProcessors());
int omDBTarProcessorThreadCount = Math.min(64, Runtime.getRuntime().availableProcessors());
this.reconContext = reconContext;
this.taskStatusUpdaterManager = taskStatusUpdaterManager;
this.omDBLagThreshold = configuration.getLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.recon;

import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

import java.io.File;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/**
* Tests for {@link TarExtractor}.
*/
public class TestTarExtractor {

@TempDir
private Path temporaryFolder;

@Test
public void testExtractTarUsesMultipleThreadsForSingleTar() throws Exception {
Path sourceDir = Files.createDirectory(temporaryFolder.resolve("source"));
int poolSize = 4;
int fileCount = poolSize * 4;
byte[] payload = new byte[4 * 1024 * 1024];
Arrays.fill(payload, (byte) 'a');
for (int i = 0; i < fileCount; i++) {
byte[] content = Arrays.copyOf(payload, payload.length + 1);
content[payload.length] = (byte) ('0' + (i % 10));
Files.write(sourceDir.resolve("file" + i + ".sst"), content);
}

File tarFile = createTarFile(sourceDir);
Path outputDir = temporaryFolder.resolve("output");
String threadPrefix = "TestTarExtractorParallel-";

TarExtractor extractor = new TarExtractor(poolSize, threadPrefix);
Set<String> writeFileThreads = ConcurrentHashMap.newKeySet();
AtomicReference<Throwable> extractionError = new AtomicReference<>();

extractor.start();
Thread extractionThread = new Thread(() -> {
try (InputStream input = Files.newInputStream(tarFile.toPath())) {
extractor.extractTar(input, outputDir);
} catch (Throwable t) {
extractionError.set(t);
}
});

try {
extractionThread.start();

long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
while (extractionThread.isAlive()
&& System.nanoTime() < deadlineNanos
&& writeFileThreads.size() < poolSize) {
Thread.getAllStackTraces().forEach((thread, stackTrace) -> {
if (thread.getName().startsWith(threadPrefix)
&& isInTarExtractorWritePath(stackTrace)) {
writeFileThreads.add(thread.getName());
}
});
Thread.sleep(1);
}

extractionThread.join(TimeUnit.SECONDS.toMillis(30));
assertFalse(extractionThread.isAlive(), "Tar extraction did not finish in time");

if (extractionError.get() != null) {
throw new AssertionError("Tar extraction failed", extractionError.get());
}

assertEquals(poolSize, writeFileThreads.size(),
"Expected writeFile to run in parallel across all configured worker threads");
} finally {
extractor.stop();
}
}

private static boolean isInTarExtractorWritePath(StackTraceElement[] stackTrace) {
for (StackTraceElement stackElement : stackTrace) {
if (TarExtractor.class.getName().equals(stackElement.getClassName())
&& ("writeFile".equals(stackElement.getMethodName())
|| "lambda$extractTar$0".equals(stackElement.getMethodName()))) {
return true;
}
}
return false;
}
}